mirror of https://github.com/citusdata/citus.git
Merge pull request #627 from citusdata/feature/mod_in_xact
Permit "single-shard" transactions cr: @anarazelpull/653/head
commit
69fbdc90c5
|
@ -94,6 +94,13 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba
|
||||||
char *effectiveDatabaseName = NULL;
|
char *effectiveDatabaseName = NULL;
|
||||||
char *effectiveUserName = NULL;
|
char *effectiveUserName = NULL;
|
||||||
|
|
||||||
|
if (IsModifyingTransaction)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||||
|
errmsg("cannot open new connections after the first modification "
|
||||||
|
"command within a transaction")));
|
||||||
|
}
|
||||||
|
|
||||||
if (connectionId == INVALID_CONNECTION_ID)
|
if (connectionId == INVALID_CONNECTION_ID)
|
||||||
{
|
{
|
||||||
ereport(WARNING, (errmsg("could not allocate connection in connection pool")));
|
ereport(WARNING, (errmsg("could not allocate connection in connection pool")));
|
||||||
|
@ -174,6 +181,13 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD
|
||||||
return connectionId;
|
return connectionId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (IsModifyingTransaction)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||||
|
errmsg("cannot open new connections after the first modification "
|
||||||
|
"command within a transaction")));
|
||||||
|
}
|
||||||
|
|
||||||
/* transcribe connection paremeters to string */
|
/* transcribe connection paremeters to string */
|
||||||
snprintf(connInfoString, STRING_BUFFER_SIZE, CONN_INFO_TEMPLATE,
|
snprintf(connInfoString, STRING_BUFFER_SIZE, CONN_INFO_TEMPLATE,
|
||||||
nodeName, nodePort, nodeDatabase, userName, CLIENT_CONNECT_TIMEOUT);
|
nodeName, nodePort, nodeDatabase, userName, CLIENT_CONNECT_TIMEOUT);
|
||||||
|
|
|
@ -11,45 +11,84 @@
|
||||||
* Copyright (c) 2012-2016, Citus Data, Inc.
|
* Copyright (c) 2012-2016, Citus Data, Inc.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "postgres.h"
|
#include "postgres.h" /* IWYU pragma: keep */
|
||||||
#include "c.h"
|
#include "c.h"
|
||||||
#include "fmgr.h"
|
#include "fmgr.h" /* IWYU pragma: keep */
|
||||||
#include "funcapi.h"
|
#include "funcapi.h"
|
||||||
#include "libpq-fe.h"
|
#include "libpq-fe.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
|
|
||||||
#include "access/xact.h"
|
#include <string.h>
|
||||||
|
|
||||||
|
#include "access/htup.h"
|
||||||
|
#include "access/sdir.h"
|
||||||
#include "access/transam.h"
|
#include "access/transam.h"
|
||||||
#include "catalog/pg_type.h"
|
#include "access/tupdesc.h"
|
||||||
|
#include "access/xact.h"
|
||||||
#include "distributed/citus_clauses.h"
|
#include "distributed/citus_clauses.h"
|
||||||
#include "distributed/citus_ruleutils.h"
|
#include "distributed/citus_ruleutils.h"
|
||||||
#include "distributed/connection_cache.h"
|
#include "distributed/connection_cache.h"
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/master_metadata_utility.h"
|
||||||
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_executor.h"
|
#include "distributed/multi_executor.h"
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
#include "distributed/multi_planner.h"
|
#include "distributed/multi_planner.h"
|
||||||
#include "distributed/multi_router_executor.h"
|
#include "distributed/multi_router_executor.h"
|
||||||
|
#include "distributed/relay_utility.h"
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
|
#include "executor/execdesc.h"
|
||||||
|
#include "executor/executor.h"
|
||||||
|
#include "executor/instrument.h"
|
||||||
|
#include "executor/tuptable.h"
|
||||||
|
#include "lib/stringinfo.h"
|
||||||
|
#include "nodes/execnodes.h"
|
||||||
|
#include "nodes/nodes.h"
|
||||||
|
#include "nodes/params.h"
|
||||||
|
#include "nodes/parsenodes.h"
|
||||||
#include "nodes/pg_list.h"
|
#include "nodes/pg_list.h"
|
||||||
#include "optimizer/clauses.h"
|
#include "nodes/plannodes.h"
|
||||||
#include "utils/builtins.h"
|
#include "storage/ipc.h"
|
||||||
|
#include "storage/lock.h"
|
||||||
|
#include "tcop/dest.h"
|
||||||
#include "utils/elog.h"
|
#include "utils/elog.h"
|
||||||
#include "utils/errcodes.h"
|
#include "utils/errcodes.h"
|
||||||
|
#include "utils/hsearch.h"
|
||||||
|
#include "utils/int8.h"
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
#include "utils/memutils.h"
|
#include "utils/memutils.h"
|
||||||
#include "utils/palloc.h"
|
#include "utils/palloc.h"
|
||||||
#include "utils/int8.h"
|
#include "utils/tuplestore.h"
|
||||||
#if (PG_VERSION_NUM >= 90500)
|
|
||||||
#include "utils/ruleutils.h"
|
|
||||||
#endif
|
|
||||||
|
|
||||||
|
|
||||||
/* controls use of locks to enforce safe commutativity */
|
/* controls use of locks to enforce safe commutativity */
|
||||||
bool AllModificationsCommutative = false;
|
bool AllModificationsCommutative = false;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The following static variables are necessary to track the progression of
|
||||||
|
* multi-statement transactions managed by the router executor. After the first
|
||||||
|
* modification within a transaction, the executor populates a hash with the
|
||||||
|
* transaction's initial participants (nodes hit by that initial modification).
|
||||||
|
*
|
||||||
|
* To keep track of the reverse mapping (from shards to nodes), we have a list
|
||||||
|
* of XactShardConnSets, which map a shard identifier to a set of connection
|
||||||
|
* hash entries. This list is walked by MarkRemainingInactivePlacements to
|
||||||
|
* ensure we mark placements as failed if they reject a COMMIT.
|
||||||
|
*
|
||||||
|
* Beyond that, there's a backend hook to register xact callbacks and a flag to
|
||||||
|
* track when a user tries to roll back to a savepoint (not allowed).
|
||||||
|
*/
|
||||||
|
static HTAB *xactParticipantHash = NULL;
|
||||||
|
static List *xactShardConnSetList = NIL;
|
||||||
|
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
|
||||||
|
static bool subXactAbortAttempted = false;
|
||||||
|
|
||||||
|
/* functions needed during start phase */
|
||||||
|
static void InitTransactionStateForTask(Task *task);
|
||||||
static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery);
|
static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery);
|
||||||
static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode);
|
static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode);
|
||||||
|
static HTAB * CreateXactParticipantHash(void);
|
||||||
|
|
||||||
|
/* functions needed during run phase */
|
||||||
static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc,
|
static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc,
|
||||||
Task *task,
|
Task *task,
|
||||||
bool isModificationQuery,
|
bool isModificationQuery,
|
||||||
|
@ -57,7 +96,9 @@ static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc,
|
||||||
static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
|
static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
|
||||||
DestReceiver *destination,
|
DestReceiver *destination,
|
||||||
Tuplestorestate *tupleStore);
|
Tuplestorestate *tupleStore);
|
||||||
static void DeparseShardQuery(Query *query, Task *task, StringInfo queryString);
|
static PGconn * GetConnectionForPlacement(ShardPlacement *placement,
|
||||||
|
bool isModificationQuery);
|
||||||
|
static void PurgeConnectionForPlacement(ShardPlacement *placement);
|
||||||
static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
|
static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
|
||||||
Oid **parameterTypes,
|
Oid **parameterTypes,
|
||||||
const char ***parameterValues);
|
const char ***parameterValues);
|
||||||
|
@ -66,6 +107,16 @@ static bool SendQueryInSingleRowMode(PGconn *connection, char *query,
|
||||||
static bool StoreQueryResult(MaterialState *routerState, PGconn *connection,
|
static bool StoreQueryResult(MaterialState *routerState, PGconn *connection,
|
||||||
TupleDesc tupleDescriptor, int64 *rows);
|
TupleDesc tupleDescriptor, int64 *rows);
|
||||||
static bool ConsumeQueryResult(PGconn *connection, int64 *rows);
|
static bool ConsumeQueryResult(PGconn *connection, int64 *rows);
|
||||||
|
static void RecordShardIdParticipant(uint64 affectedShardId,
|
||||||
|
NodeConnectionEntry *participantEntry);
|
||||||
|
|
||||||
|
/* functions needed by callbacks and hooks */
|
||||||
|
static void RegisterRouterExecutorXactCallbacks(void);
|
||||||
|
static void RouterTransactionCallback(XactEvent event, void *arg);
|
||||||
|
static void RouterSubtransactionCallback(SubXactEvent event, SubTransactionId subId,
|
||||||
|
SubTransactionId parentSubid, void *arg);
|
||||||
|
static void ExecuteTransactionEnd(bool commit);
|
||||||
|
static void MarkRemainingInactivePlacements(void);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -82,12 +133,21 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task)
|
||||||
/* ensure that the task is not NULL */
|
/* ensure that the task is not NULL */
|
||||||
Assert(task != NULL);
|
Assert(task != NULL);
|
||||||
|
|
||||||
/* disallow transactions and triggers during distributed modify commands */
|
/* disallow triggers during distributed modify commands */
|
||||||
if (commandType != CMD_SELECT)
|
if (commandType != CMD_SELECT)
|
||||||
{
|
{
|
||||||
bool topLevel = true;
|
|
||||||
PreventTransactionChain(topLevel, "distributed commands");
|
|
||||||
eflags |= EXEC_FLAG_SKIP_TRIGGERS;
|
eflags |= EXEC_FLAG_SKIP_TRIGGERS;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We could naturally handle function-based transactions (i.e. those
|
||||||
|
* using PL/pgSQL or similar) by checking the type of queryDesc->dest,
|
||||||
|
* but some customers already use functions that touch multiple shards
|
||||||
|
* from within a function, so we'll ignore functions for now.
|
||||||
|
*/
|
||||||
|
if (IsTransactionBlock() && xactParticipantHash == NULL)
|
||||||
|
{
|
||||||
|
InitTransactionStateForTask(task);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* signal that it is a router execution */
|
/* signal that it is a router execution */
|
||||||
|
@ -122,6 +182,62 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* InitTransactionStateForTask is called during executor start with the first
|
||||||
|
* modifying (INSERT/UPDATE/DELETE) task during a transaction. It creates the
|
||||||
|
* transaction participant hash, opens connections to this task's nodes, and
|
||||||
|
* populates the hash with those connections after sending BEGIN commands to
|
||||||
|
* each. If a node fails to respond, its connection is set to NULL to prevent
|
||||||
|
* further interaction with it during the transaction.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
InitTransactionStateForTask(Task *task)
|
||||||
|
{
|
||||||
|
ListCell *placementCell = NULL;
|
||||||
|
|
||||||
|
xactParticipantHash = CreateXactParticipantHash();
|
||||||
|
|
||||||
|
foreach(placementCell, task->taskPlacementList)
|
||||||
|
{
|
||||||
|
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
|
||||||
|
NodeConnectionKey participantKey;
|
||||||
|
NodeConnectionEntry *participantEntry = NULL;
|
||||||
|
bool entryFound = false;
|
||||||
|
|
||||||
|
PGconn *connection = NULL;
|
||||||
|
|
||||||
|
MemSet(&participantKey, 0, sizeof(participantKey));
|
||||||
|
strlcpy(participantKey.nodeName, placement->nodeName,
|
||||||
|
MAX_NODE_LENGTH + 1);
|
||||||
|
participantKey.nodePort = placement->nodePort;
|
||||||
|
|
||||||
|
participantEntry = hash_search(xactParticipantHash, &participantKey,
|
||||||
|
HASH_ENTER, &entryFound);
|
||||||
|
Assert(!entryFound);
|
||||||
|
|
||||||
|
connection = GetOrEstablishConnection(placement->nodeName,
|
||||||
|
placement->nodePort);
|
||||||
|
if (connection != NULL)
|
||||||
|
{
|
||||||
|
PGresult *result = PQexec(connection, "BEGIN");
|
||||||
|
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||||
|
{
|
||||||
|
WarnRemoteError(connection, result);
|
||||||
|
PurgeConnection(connection);
|
||||||
|
|
||||||
|
connection = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
PQclear(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
participantEntry->connection = connection;
|
||||||
|
}
|
||||||
|
|
||||||
|
IsModifyingTransaction = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CommutativityRuleToLockMode determines the commutativity rule for the given
|
* CommutativityRuleToLockMode determines the commutativity rule for the given
|
||||||
* command and returns the appropriate lock mode to enforce that rule. The
|
* command and returns the appropriate lock mode to enforce that rule. The
|
||||||
|
@ -188,6 +304,40 @@ AcquireExecutorShardLock(Task *task, LOCKMODE lockMode)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CreateXactParticipantHash initializes the map used to store the connections
|
||||||
|
* needed to process distributed transactions. Unlike the connection cache, we
|
||||||
|
* permit NULL connections here to signify that a participant has seen an error
|
||||||
|
* and is no longer receiving commands during a transaction. This hash should
|
||||||
|
* be walked at transaction end to send final COMMIT or ABORT commands.
|
||||||
|
*/
|
||||||
|
static HTAB *
|
||||||
|
CreateXactParticipantHash(void)
|
||||||
|
{
|
||||||
|
HTAB *xactParticipantHash = NULL;
|
||||||
|
HASHCTL info;
|
||||||
|
int hashFlags = 0;
|
||||||
|
|
||||||
|
MemSet(&info, 0, sizeof(info));
|
||||||
|
info.keysize = sizeof(NodeConnectionKey);
|
||||||
|
info.entrysize = sizeof(NodeConnectionEntry);
|
||||||
|
info.hcxt = TopTransactionContext;
|
||||||
|
hashFlags = (HASH_ELEM | HASH_CONTEXT);
|
||||||
|
|
||||||
|
#if (PG_VERSION_NUM >= 90500)
|
||||||
|
hashFlags |= HASH_BLOBS;
|
||||||
|
#else
|
||||||
|
hashFlags |= HASH_FUNCTION;
|
||||||
|
info.hash = tag_hash;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
xactParticipantHash = hash_create("citus xact participant hash", 32, &info,
|
||||||
|
hashFlags);
|
||||||
|
|
||||||
|
return xactParticipantHash;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RouterExecutorRun actually executes a single task on a worker.
|
* RouterExecutorRun actually executes a single task on a worker.
|
||||||
*/
|
*/
|
||||||
|
@ -328,7 +478,6 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
|
||||||
List *taskPlacementList = task->taskPlacementList;
|
List *taskPlacementList = task->taskPlacementList;
|
||||||
ListCell *taskPlacementCell = NULL;
|
ListCell *taskPlacementCell = NULL;
|
||||||
List *failedPlacementList = NIL;
|
List *failedPlacementList = NIL;
|
||||||
ListCell *failedPlacementCell = NULL;
|
|
||||||
int64 affectedTupleCount = -1;
|
int64 affectedTupleCount = -1;
|
||||||
bool gotResults = false;
|
bool gotResults = false;
|
||||||
char *queryString = task->queryString;
|
char *queryString = task->queryString;
|
||||||
|
@ -338,10 +487,11 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
|
||||||
PlannedStmt *planStatement = queryDesc->plannedstmt;
|
PlannedStmt *planStatement = queryDesc->plannedstmt;
|
||||||
MultiPlan *multiPlan = GetMultiPlan(planStatement);
|
MultiPlan *multiPlan = GetMultiPlan(planStatement);
|
||||||
Query *query = multiPlan->workerJob->jobQuery;
|
Query *query = multiPlan->workerJob->jobQuery;
|
||||||
|
Oid relid = ((RangeTblEntry *) linitial(query->rtable))->relid;
|
||||||
StringInfo queryStringInfo = makeStringInfo();
|
StringInfo queryStringInfo = makeStringInfo();
|
||||||
|
|
||||||
ExecuteMasterEvaluableFunctions(query);
|
ExecuteMasterEvaluableFunctions(query);
|
||||||
DeparseShardQuery(query, task, queryStringInfo);
|
deparse_shard_query(query, relid, task->anchorShardId, queryStringInfo);
|
||||||
queryString = queryStringInfo->data;
|
queryString = queryStringInfo->data;
|
||||||
|
|
||||||
elog(DEBUG4, "query before master evaluation: %s", task->queryString);
|
elog(DEBUG4, "query before master evaluation: %s", task->queryString);
|
||||||
|
@ -355,11 +505,10 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
|
||||||
foreach(taskPlacementCell, taskPlacementList)
|
foreach(taskPlacementCell, taskPlacementList)
|
||||||
{
|
{
|
||||||
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
|
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
|
||||||
char *nodeName = taskPlacement->nodeName;
|
|
||||||
int32 nodePort = taskPlacement->nodePort;
|
|
||||||
bool queryOK = false;
|
bool queryOK = false;
|
||||||
int64 currentAffectedTupleCount = 0;
|
int64 currentAffectedTupleCount = 0;
|
||||||
PGconn *connection = GetOrEstablishConnection(nodeName, nodePort);
|
PGconn *connection = GetConnectionForPlacement(taskPlacement,
|
||||||
|
isModificationQuery);
|
||||||
|
|
||||||
if (connection == NULL)
|
if (connection == NULL)
|
||||||
{
|
{
|
||||||
|
@ -370,7 +519,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
|
||||||
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
|
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
|
||||||
if (!queryOK)
|
if (!queryOK)
|
||||||
{
|
{
|
||||||
PurgeConnection(connection);
|
PurgeConnectionForPlacement(taskPlacement);
|
||||||
failedPlacementList = lappend(failedPlacementList, taskPlacement);
|
failedPlacementList = lappend(failedPlacementList, taskPlacement);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -404,7 +553,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
|
||||||
"to modify "INT64_FORMAT,
|
"to modify "INT64_FORMAT,
|
||||||
currentAffectedTupleCount, affectedTupleCount),
|
currentAffectedTupleCount, affectedTupleCount),
|
||||||
errdetail("modified placement on %s:%d",
|
errdetail("modified placement on %s:%d",
|
||||||
nodeName, nodePort)));
|
taskPlacement->nodeName, taskPlacement->nodePort)));
|
||||||
}
|
}
|
||||||
|
|
||||||
#if (PG_VERSION_NUM < 90600)
|
#if (PG_VERSION_NUM < 90600)
|
||||||
|
@ -427,7 +576,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
PurgeConnection(connection);
|
PurgeConnectionForPlacement(taskPlacement);
|
||||||
|
|
||||||
failedPlacementList = lappend(failedPlacementList, taskPlacement);
|
failedPlacementList = lappend(failedPlacementList, taskPlacement);
|
||||||
|
|
||||||
|
@ -437,6 +586,8 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
|
||||||
|
|
||||||
if (isModificationQuery)
|
if (isModificationQuery)
|
||||||
{
|
{
|
||||||
|
ListCell *failedPlacementCell = NULL;
|
||||||
|
|
||||||
/* if all placements failed, error out */
|
/* if all placements failed, error out */
|
||||||
if (list_length(failedPlacementList) == list_length(task->taskPlacementList))
|
if (list_length(failedPlacementList) == list_length(task->taskPlacementList))
|
||||||
{
|
{
|
||||||
|
@ -463,16 +614,6 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void
|
|
||||||
DeparseShardQuery(Query *query, Task *task, StringInfo queryString)
|
|
||||||
{
|
|
||||||
uint64 shardId = task->anchorShardId;
|
|
||||||
Oid relid = ((RangeTblEntry *) linitial(query->rtable))->relid;
|
|
||||||
|
|
||||||
deparse_shard_query(query, relid, shardId, queryString);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ReturnRowsFromTuplestore moves rows from a given tuplestore into a
|
* ReturnRowsFromTuplestore moves rows from a given tuplestore into a
|
||||||
* receiver. It performs the necessary limiting to support cursors.
|
* receiver. It performs the necessary limiting to support cursors.
|
||||||
|
@ -517,6 +658,99 @@ ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GetConnectionForPlacement is the main entry point for acquiring a connection
|
||||||
|
* within the router executor. By using placements (rather than node names and
|
||||||
|
* ports) to identify connections, the router executor can keep track of shards
|
||||||
|
* used by multi-statement transactions and error out if a transaction tries
|
||||||
|
* to reach a new node altogether). In the single-statement commands context,
|
||||||
|
* GetConnectionForPlacement simply falls through to GetOrEstablishConnection.
|
||||||
|
*/
|
||||||
|
static PGconn *
|
||||||
|
GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery)
|
||||||
|
{
|
||||||
|
NodeConnectionKey participantKey;
|
||||||
|
NodeConnectionEntry *participantEntry = NULL;
|
||||||
|
bool entryFound = false;
|
||||||
|
|
||||||
|
/* if not in a transaction, fall through to connection cache */
|
||||||
|
if (xactParticipantHash == NULL)
|
||||||
|
{
|
||||||
|
PGconn *connection = GetOrEstablishConnection(placement->nodeName,
|
||||||
|
placement->nodePort);
|
||||||
|
|
||||||
|
return connection;
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert(IsTransactionBlock());
|
||||||
|
|
||||||
|
MemSet(&participantKey, 0, sizeof(participantKey));
|
||||||
|
strlcpy(participantKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1);
|
||||||
|
participantKey.nodePort = placement->nodePort;
|
||||||
|
|
||||||
|
participantEntry = hash_search(xactParticipantHash, &participantKey, HASH_FIND,
|
||||||
|
&entryFound);
|
||||||
|
|
||||||
|
if (entryFound)
|
||||||
|
{
|
||||||
|
if (isModificationQuery)
|
||||||
|
{
|
||||||
|
RecordShardIdParticipant(placement->shardId, participantEntry);
|
||||||
|
}
|
||||||
|
|
||||||
|
return participantEntry->connection;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
|
||||||
|
errmsg("no transaction participant matches %s:%d",
|
||||||
|
placement->nodeName, placement->nodePort),
|
||||||
|
errdetail("Transactions which modify distributed tables may only "
|
||||||
|
"target nodes affected by the modification command "
|
||||||
|
"which began the transaction.")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* PurgeConnectionForPlacement provides a way to purge an invalid connection
|
||||||
|
* from all relevant connection hashes using the placement involved in the
|
||||||
|
* query at the time of the error. If a transaction is ongoing, this function
|
||||||
|
* ensures the right node's connection is set to NULL in the participant map
|
||||||
|
* for the transaction in addition to purging the connection cache's entry.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
PurgeConnectionForPlacement(ShardPlacement *placement)
|
||||||
|
{
|
||||||
|
NodeConnectionKey nodeKey;
|
||||||
|
char *currentUser = CurrentUserName();
|
||||||
|
|
||||||
|
MemSet(&nodeKey, 0, sizeof(NodeConnectionKey));
|
||||||
|
strlcpy(nodeKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1);
|
||||||
|
nodeKey.nodePort = placement->nodePort;
|
||||||
|
strlcpy(nodeKey.nodeUser, currentUser, NAMEDATALEN);
|
||||||
|
|
||||||
|
PurgeConnectionByKey(&nodeKey);
|
||||||
|
|
||||||
|
if (xactParticipantHash != NULL)
|
||||||
|
{
|
||||||
|
NodeConnectionEntry *participantEntry = NULL;
|
||||||
|
bool entryFound = false;
|
||||||
|
|
||||||
|
Assert(IsTransactionBlock());
|
||||||
|
|
||||||
|
/* the participant hash doesn't use the user field */
|
||||||
|
MemSet(&nodeKey.nodeUser, 0, sizeof(nodeKey.nodeUser));
|
||||||
|
participantEntry = hash_search(xactParticipantHash, &nodeKey, HASH_FIND,
|
||||||
|
&entryFound);
|
||||||
|
|
||||||
|
Assert(entryFound);
|
||||||
|
|
||||||
|
participantEntry->connection = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SendQueryInSingleRowMode sends the given query on the connection in an
|
* SendQueryInSingleRowMode sends the given query on the connection in an
|
||||||
* asynchronous way. The function also sets the single-row mode on the
|
* asynchronous way. The function also sets the single-row mode on the
|
||||||
|
@ -830,6 +1064,50 @@ ConsumeQueryResult(PGconn *connection, int64 *rows)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RecordShardIdParticipant registers a connection as being involved with a
|
||||||
|
* particular shard during a multi-statement transaction.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
RecordShardIdParticipant(uint64 affectedShardId, NodeConnectionEntry *participantEntry)
|
||||||
|
{
|
||||||
|
XactShardConnSet *shardConnSetMatch = NULL;
|
||||||
|
ListCell *listCell = NULL;
|
||||||
|
MemoryContext oldContext = NULL;
|
||||||
|
List *connectionEntryList = NIL;
|
||||||
|
|
||||||
|
/* check whether an entry already exists for this shard */
|
||||||
|
foreach(listCell, xactShardConnSetList)
|
||||||
|
{
|
||||||
|
XactShardConnSet *shardConnSet = (XactShardConnSet *) lfirst(listCell);
|
||||||
|
|
||||||
|
if (shardConnSet->shardId == affectedShardId)
|
||||||
|
{
|
||||||
|
shardConnSetMatch = shardConnSet;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* entries must last through the whole top-level transaction */
|
||||||
|
oldContext = MemoryContextSwitchTo(TopTransactionContext);
|
||||||
|
|
||||||
|
/* if no entry found, make one */
|
||||||
|
if (shardConnSetMatch == NULL)
|
||||||
|
{
|
||||||
|
shardConnSetMatch = (XactShardConnSet *) palloc0(sizeof(XactShardConnSet));
|
||||||
|
shardConnSetMatch->shardId = affectedShardId;
|
||||||
|
|
||||||
|
xactShardConnSetList = lappend(xactShardConnSetList, shardConnSetMatch);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* add connection, avoiding duplicates */
|
||||||
|
connectionEntryList = shardConnSetMatch->connectionEntryList;
|
||||||
|
shardConnSetMatch->connectionEntryList = list_append_unique_ptr(connectionEntryList,
|
||||||
|
participantEntry);
|
||||||
|
|
||||||
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RouterExecutorFinish cleans up after a distributed execution.
|
* RouterExecutorFinish cleans up after a distributed execution.
|
||||||
*/
|
*/
|
||||||
|
@ -864,3 +1142,240 @@ RouterExecutorEnd(QueryDesc *queryDesc)
|
||||||
queryDesc->estate = NULL;
|
queryDesc->estate = NULL;
|
||||||
queryDesc->totaltime = NULL;
|
queryDesc->totaltime = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* InstallRouterExecutorShmemHook simply installs a hook (intended to be called
|
||||||
|
* once during backend startup), which will itself register all the transaction
|
||||||
|
* callbacks needed by this executor.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
InstallRouterExecutorShmemHook(void)
|
||||||
|
{
|
||||||
|
prev_shmem_startup_hook = shmem_startup_hook;
|
||||||
|
shmem_startup_hook = RegisterRouterExecutorXactCallbacks;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RegisterRouterExecutorXactCallbacks registers (sub-)transaction callbacks
|
||||||
|
* needed by this executor before calling any previous shmem startup hooks.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
RegisterRouterExecutorXactCallbacks(void)
|
||||||
|
{
|
||||||
|
RegisterXactCallback(RouterTransactionCallback, NULL);
|
||||||
|
RegisterSubXactCallback(RouterSubtransactionCallback, NULL);
|
||||||
|
|
||||||
|
if (prev_shmem_startup_hook != NULL)
|
||||||
|
{
|
||||||
|
prev_shmem_startup_hook();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RouterTransactionCallback handles committing or aborting remote transactions
|
||||||
|
* after the local one has committed or aborted. It only sends COMMIT or ABORT
|
||||||
|
* commands to still-healthy remotes; the failed ones are marked as inactive if
|
||||||
|
* after a successful COMMIT (no need to mark on ABORTs).
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
RouterTransactionCallback(XactEvent event, void *arg)
|
||||||
|
{
|
||||||
|
if (xactParticipantHash == NULL)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (event)
|
||||||
|
{
|
||||||
|
#if (PG_VERSION_NUM >= 90500)
|
||||||
|
case XACT_EVENT_PARALLEL_COMMIT:
|
||||||
|
#endif
|
||||||
|
case XACT_EVENT_COMMIT:
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if (PG_VERSION_NUM >= 90500)
|
||||||
|
case XACT_EVENT_PARALLEL_ABORT:
|
||||||
|
#endif
|
||||||
|
case XACT_EVENT_ABORT:
|
||||||
|
{
|
||||||
|
bool commit = false;
|
||||||
|
|
||||||
|
ExecuteTransactionEnd(commit);
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* no support for prepare with multi-statement transactions */
|
||||||
|
case XACT_EVENT_PREPARE:
|
||||||
|
case XACT_EVENT_PRE_PREPARE:
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("cannot prepare a transaction that modified "
|
||||||
|
"distributed tables")));
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if (PG_VERSION_NUM >= 90500)
|
||||||
|
case XACT_EVENT_PARALLEL_PRE_COMMIT:
|
||||||
|
#endif
|
||||||
|
case XACT_EVENT_PRE_COMMIT:
|
||||||
|
{
|
||||||
|
bool commit = true;
|
||||||
|
|
||||||
|
if (subXactAbortAttempted)
|
||||||
|
{
|
||||||
|
subXactAbortAttempted = false;
|
||||||
|
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("cannot ROLLBACK TO SAVEPOINT in transactions "
|
||||||
|
"which modify distributed tables")));
|
||||||
|
}
|
||||||
|
|
||||||
|
ExecuteTransactionEnd(commit);
|
||||||
|
MarkRemainingInactivePlacements();
|
||||||
|
|
||||||
|
/* leave early to avoid resetting transaction state */
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* reset transaction state */
|
||||||
|
IsModifyingTransaction = false;
|
||||||
|
xactParticipantHash = NULL;
|
||||||
|
xactShardConnSetList = NIL;
|
||||||
|
subXactAbortAttempted = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RouterSubtransactionCallback silently keeps track of any attempt to ROLLBACK
|
||||||
|
* TO SAVEPOINT, which is not permitted by this executor. At transaction end,
|
||||||
|
* the executor checks whether such a rollback was attempted and, if so, errors
|
||||||
|
* out entirely (with an appropriate message).
|
||||||
|
*
|
||||||
|
* This implementation permits savepoints so long as no rollbacks occur.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
RouterSubtransactionCallback(SubXactEvent event, SubTransactionId subId,
|
||||||
|
SubTransactionId parentSubid, void *arg)
|
||||||
|
{
|
||||||
|
if ((xactParticipantHash != NULL) && (event == SUBXACT_EVENT_ABORT_SUB))
|
||||||
|
{
|
||||||
|
subXactAbortAttempted = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ExecuteTransactionEnd ends any remote transactions still taking place on
|
||||||
|
* remote nodes. It uses xactParticipantHash to know which nodes need any
|
||||||
|
* final COMMIT or ABORT commands. Nodes which fail a final COMMIT will have
|
||||||
|
* their connection field set to NULL to permit placement invalidation.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ExecuteTransactionEnd(bool commit)
|
||||||
|
{
|
||||||
|
const char *sqlCommand = commit ? "COMMIT TRANSACTION" : "ABORT TRANSACTION";
|
||||||
|
HASH_SEQ_STATUS scan;
|
||||||
|
NodeConnectionEntry *participant;
|
||||||
|
bool completed = !commit; /* aborts are assumed completed */
|
||||||
|
|
||||||
|
hash_seq_init(&scan, xactParticipantHash);
|
||||||
|
while ((participant = (NodeConnectionEntry *) hash_seq_search(&scan)))
|
||||||
|
{
|
||||||
|
PGconn *connection = participant->connection;
|
||||||
|
PGresult *result = NULL;
|
||||||
|
|
||||||
|
if (PQstatus(connection) != CONNECTION_OK)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
result = PQexec(connection, sqlCommand);
|
||||||
|
if (PQresultStatus(result) == PGRES_COMMAND_OK)
|
||||||
|
{
|
||||||
|
completed = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
WarnRemoteError(connection, result);
|
||||||
|
PurgeConnection(participant->connection);
|
||||||
|
|
||||||
|
participant->connection = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
PQclear(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!completed)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("could not commit transaction on any active nodes")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* MarkRemainingInactivePlacements takes care of marking placements of a shard
|
||||||
|
* inactive after some of the placements rejected the final COMMIT phase of a
|
||||||
|
* transaction. This step is skipped if all placements reject the COMMIT, since
|
||||||
|
* in that case no modifications to the placement have persisted.
|
||||||
|
*
|
||||||
|
* Failures are detected by checking the connection field of the entries in the
|
||||||
|
* connection set for each shard: it is always set to NULL after errors.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
MarkRemainingInactivePlacements(void)
|
||||||
|
{
|
||||||
|
ListCell *shardConnSetCell = NULL;
|
||||||
|
|
||||||
|
foreach(shardConnSetCell, xactShardConnSetList)
|
||||||
|
{
|
||||||
|
XactShardConnSet *shardConnSet = (XactShardConnSet *) lfirst(shardConnSetCell);
|
||||||
|
List *participantList = shardConnSet->connectionEntryList;
|
||||||
|
ListCell *participantCell = NULL;
|
||||||
|
int successes = list_length(participantList); /* assume full success */
|
||||||
|
|
||||||
|
/* determine how many actual successes there were: subtract failures */
|
||||||
|
foreach(participantCell, participantList)
|
||||||
|
{
|
||||||
|
NodeConnectionEntry *participant = NULL;
|
||||||
|
participant = (NodeConnectionEntry *) lfirst(participantCell);
|
||||||
|
|
||||||
|
/* other codes sets connection to NULL after errors */
|
||||||
|
if (participant->connection == NULL)
|
||||||
|
{
|
||||||
|
successes--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* if no nodes succeeded for this shard, don't do anything */
|
||||||
|
if (successes == 0)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* otherwise, ensure failed placements are marked inactive */
|
||||||
|
foreach(participantCell, participantList)
|
||||||
|
{
|
||||||
|
NodeConnectionEntry *participant = NULL;
|
||||||
|
participant = (NodeConnectionEntry *) lfirst(participantCell);
|
||||||
|
|
||||||
|
if (participant->connection == NULL)
|
||||||
|
{
|
||||||
|
uint64 shardId = shardConnSet->shardId;
|
||||||
|
NodeConnectionKey *nodeKey = &participant->cacheKey;
|
||||||
|
uint64 shardLength = 0;
|
||||||
|
|
||||||
|
DeleteShardPlacementRow(shardId, nodeKey->nodeName, nodeKey->nodePort);
|
||||||
|
InsertShardPlacementRow(shardId, FILE_INACTIVE, shardLength,
|
||||||
|
nodeKey->nodeName, nodeKey->nodePort);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -147,6 +147,9 @@ _PG_init(void)
|
||||||
|
|
||||||
/* initialize worker node manager */
|
/* initialize worker node manager */
|
||||||
WorkerNodeRegister();
|
WorkerNodeRegister();
|
||||||
|
|
||||||
|
/* initialize router executor callbacks */
|
||||||
|
InstallRouterExecutorShmemHook();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,9 @@
|
||||||
#include "utils/palloc.h"
|
#include "utils/palloc.h"
|
||||||
|
|
||||||
|
|
||||||
|
/* state needed to prevent new connections during modifying transactions */
|
||||||
|
bool IsModifyingTransaction = false;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* NodeConnectionHash is the connection hash itself. It begins uninitialized.
|
* NodeConnectionHash is the connection hash itself. It begins uninitialized.
|
||||||
* The first call to GetOrEstablishConnection triggers hash creation.
|
* The first call to GetOrEstablishConnection triggers hash creation.
|
||||||
|
@ -81,9 +84,9 @@ GetOrEstablishConnection(char *nodeName, int32 nodePort)
|
||||||
}
|
}
|
||||||
|
|
||||||
memset(&nodeConnectionKey, 0, sizeof(nodeConnectionKey));
|
memset(&nodeConnectionKey, 0, sizeof(nodeConnectionKey));
|
||||||
strncpy(nodeConnectionKey.nodeName, nodeName, MAX_NODE_LENGTH);
|
strlcpy(nodeConnectionKey.nodeName, nodeName, MAX_NODE_LENGTH + 1);
|
||||||
nodeConnectionKey.nodePort = nodePort;
|
nodeConnectionKey.nodePort = nodePort;
|
||||||
strncpy(nodeConnectionKey.nodeUser, userName, NAMEDATALEN);
|
strlcpy(nodeConnectionKey.nodeUser, userName, NAMEDATALEN);
|
||||||
|
|
||||||
nodeConnectionEntry = hash_search(NodeConnectionHash, &nodeConnectionKey,
|
nodeConnectionEntry = hash_search(NodeConnectionHash, &nodeConnectionKey,
|
||||||
HASH_FIND, &entryFound);
|
HASH_FIND, &entryFound);
|
||||||
|
@ -124,11 +127,10 @@ void
|
||||||
PurgeConnection(PGconn *connection)
|
PurgeConnection(PGconn *connection)
|
||||||
{
|
{
|
||||||
NodeConnectionKey nodeConnectionKey;
|
NodeConnectionKey nodeConnectionKey;
|
||||||
NodeConnectionEntry *nodeConnectionEntry = NULL;
|
|
||||||
bool entryFound = false;
|
|
||||||
char *nodeNameString = NULL;
|
char *nodeNameString = NULL;
|
||||||
char *nodePortString = NULL;
|
char *nodePortString = NULL;
|
||||||
char *nodeUserString = NULL;
|
char *nodeUserString = NULL;
|
||||||
|
PGconn *purgedConnection = NULL;
|
||||||
|
|
||||||
nodeNameString = ConnectionGetOptionValue(connection, "host");
|
nodeNameString = ConnectionGetOptionValue(connection, "host");
|
||||||
if (nodeNameString == NULL)
|
if (nodeNameString == NULL)
|
||||||
|
@ -152,42 +154,54 @@ PurgeConnection(PGconn *connection)
|
||||||
}
|
}
|
||||||
|
|
||||||
memset(&nodeConnectionKey, 0, sizeof(nodeConnectionKey));
|
memset(&nodeConnectionKey, 0, sizeof(nodeConnectionKey));
|
||||||
strncpy(nodeConnectionKey.nodeName, nodeNameString, MAX_NODE_LENGTH);
|
strlcpy(nodeConnectionKey.nodeName, nodeNameString, MAX_NODE_LENGTH + 1);
|
||||||
nodeConnectionKey.nodePort = pg_atoi(nodePortString, sizeof(int32), 0);
|
nodeConnectionKey.nodePort = pg_atoi(nodePortString, sizeof(int32), 0);
|
||||||
strncpy(nodeConnectionKey.nodeUser, nodeUserString, NAMEDATALEN);
|
strlcpy(nodeConnectionKey.nodeUser, nodeUserString, NAMEDATALEN);
|
||||||
|
|
||||||
pfree(nodeNameString);
|
pfree(nodeNameString);
|
||||||
pfree(nodePortString);
|
pfree(nodePortString);
|
||||||
pfree(nodeUserString);
|
pfree(nodeUserString);
|
||||||
|
|
||||||
nodeConnectionEntry = hash_search(NodeConnectionHash, &nodeConnectionKey,
|
purgedConnection = PurgeConnectionByKey(&nodeConnectionKey);
|
||||||
HASH_REMOVE, &entryFound);
|
|
||||||
|
/*
|
||||||
|
* It's possible the provided connection matches the host and port for
|
||||||
|
* an entry in the hash without being precisely the same connection. In
|
||||||
|
* that case, we will want to close the provided connection in addition
|
||||||
|
* to the one from the hash (which was closed by PurgeConnectionByKey).
|
||||||
|
*/
|
||||||
|
if (purgedConnection != connection)
|
||||||
|
{
|
||||||
|
ereport(WARNING, (errmsg("hash entry for \"%s:%d\" contained different "
|
||||||
|
"connection than that provided by caller",
|
||||||
|
nodeConnectionKey.nodeName,
|
||||||
|
nodeConnectionKey.nodePort)));
|
||||||
|
PQfinish(connection);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
PGconn *
|
||||||
|
PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey)
|
||||||
|
{
|
||||||
|
bool entryFound = false;
|
||||||
|
NodeConnectionEntry *nodeConnectionEntry = NULL;
|
||||||
|
|
||||||
|
nodeConnectionEntry = hash_search(NodeConnectionHash, nodeConnectionKey, HASH_REMOVE,
|
||||||
|
&entryFound);
|
||||||
if (entryFound)
|
if (entryFound)
|
||||||
{
|
{
|
||||||
/*
|
PQfinish(nodeConnectionEntry->connection);
|
||||||
* It's possible the provided connection matches the host and port for
|
|
||||||
* an entry in the hash without being precisely the same connection. In
|
|
||||||
* that case, we will want to close the hash's connection (because the
|
|
||||||
* entry has already been removed) in addition to the provided one.
|
|
||||||
*/
|
|
||||||
if (nodeConnectionEntry->connection != connection)
|
|
||||||
{
|
|
||||||
ereport(WARNING, (errmsg("hash entry for \"%s:%d\" contained different "
|
|
||||||
"connection than that provided by caller",
|
|
||||||
nodeConnectionKey.nodeName,
|
|
||||||
nodeConnectionKey.nodePort)));
|
|
||||||
PQfinish(nodeConnectionEntry->connection);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
ereport(WARNING, (errcode(ERRCODE_NO_DATA),
|
ereport(WARNING, (errcode(ERRCODE_NO_DATA),
|
||||||
errmsg("could not find hash entry for connection to \"%s:%d\"",
|
errmsg("could not find hash entry for connection to \"%s:%d\"",
|
||||||
nodeConnectionKey.nodeName,
|
nodeConnectionKey->nodeName,
|
||||||
nodeConnectionKey.nodePort)));
|
nodeConnectionKey->nodePort)));
|
||||||
}
|
}
|
||||||
|
|
||||||
PQfinish(connection);
|
return nodeConnectionEntry->connection;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -370,6 +384,13 @@ ConnectToNode(char *nodeName, int32 nodePort, char *nodeUser)
|
||||||
|
|
||||||
sprintf(nodePortString, "%d", nodePort);
|
sprintf(nodePortString, "%d", nodePort);
|
||||||
|
|
||||||
|
if (IsModifyingTransaction)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||||
|
errmsg("cannot open new connections after the first modification "
|
||||||
|
"command within a transaction")));
|
||||||
|
}
|
||||||
|
|
||||||
Assert(sizeof(keywordArray) == sizeof(valueArray));
|
Assert(sizeof(keywordArray) == sizeof(valueArray));
|
||||||
|
|
||||||
for (attemptIndex = 0; attemptIndex < MAX_CONNECT_ATTEMPTS; attemptIndex++)
|
for (attemptIndex = 0; attemptIndex < MAX_CONNECT_ATTEMPTS; attemptIndex++)
|
||||||
|
|
|
@ -399,8 +399,8 @@ TrackerCleanupJobSchemas(void)
|
||||||
cleanupTask->assignedAt = HIGH_PRIORITY_TASK_TIME;
|
cleanupTask->assignedAt = HIGH_PRIORITY_TASK_TIME;
|
||||||
cleanupTask->taskStatus = TASK_ASSIGNED;
|
cleanupTask->taskStatus = TASK_ASSIGNED;
|
||||||
|
|
||||||
strncpy(cleanupTask->taskCallString, JOB_SCHEMA_CLEANUP, TASK_CALL_STRING_SIZE);
|
strlcpy(cleanupTask->taskCallString, JOB_SCHEMA_CLEANUP, TASK_CALL_STRING_SIZE);
|
||||||
strncpy(cleanupTask->databaseName, databaseName, NAMEDATALEN);
|
strlcpy(cleanupTask->databaseName, databaseName, NAMEDATALEN);
|
||||||
|
|
||||||
/* zero out all other fields */
|
/* zero out all other fields */
|
||||||
cleanupTask->connectionId = INVALID_CONNECTION_ID;
|
cleanupTask->connectionId = INVALID_CONNECTION_ID;
|
||||||
|
|
|
@ -313,13 +313,13 @@ CreateTask(uint64 jobId, uint32 taskId, char *taskCallString)
|
||||||
/* enter the worker task into shared hash and initialize the task */
|
/* enter the worker task into shared hash and initialize the task */
|
||||||
workerTask = WorkerTasksHashEnter(jobId, taskId);
|
workerTask = WorkerTasksHashEnter(jobId, taskId);
|
||||||
workerTask->assignedAt = assignmentTime;
|
workerTask->assignedAt = assignmentTime;
|
||||||
strncpy(workerTask->taskCallString, taskCallString, TASK_CALL_STRING_SIZE);
|
strlcpy(workerTask->taskCallString, taskCallString, TASK_CALL_STRING_SIZE);
|
||||||
|
|
||||||
workerTask->taskStatus = TASK_ASSIGNED;
|
workerTask->taskStatus = TASK_ASSIGNED;
|
||||||
workerTask->connectionId = INVALID_CONNECTION_ID;
|
workerTask->connectionId = INVALID_CONNECTION_ID;
|
||||||
workerTask->failureCount = 0;
|
workerTask->failureCount = 0;
|
||||||
strncpy(workerTask->databaseName, databaseName, NAMEDATALEN);
|
strlcpy(workerTask->databaseName, databaseName, NAMEDATALEN);
|
||||||
strncpy(workerTask->userName, userName, NAMEDATALEN);
|
strlcpy(workerTask->userName, userName, NAMEDATALEN);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -350,13 +350,13 @@ UpdateTask(WorkerTask *workerTask, char *taskCallString)
|
||||||
}
|
}
|
||||||
else if (taskStatus == TASK_PERMANENTLY_FAILED)
|
else if (taskStatus == TASK_PERMANENTLY_FAILED)
|
||||||
{
|
{
|
||||||
strncpy(workerTask->taskCallString, taskCallString, TASK_CALL_STRING_SIZE);
|
strlcpy(workerTask->taskCallString, taskCallString, TASK_CALL_STRING_SIZE);
|
||||||
workerTask->failureCount = 0;
|
workerTask->failureCount = 0;
|
||||||
workerTask->taskStatus = TASK_ASSIGNED;
|
workerTask->taskStatus = TASK_ASSIGNED;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
strncpy(workerTask->taskCallString, taskCallString, TASK_CALL_STRING_SIZE);
|
strlcpy(workerTask->taskCallString, taskCallString, TASK_CALL_STRING_SIZE);
|
||||||
workerTask->failureCount = 0;
|
workerTask->failureCount = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,9 +53,14 @@ typedef struct NodeConnectionEntry
|
||||||
} NodeConnectionEntry;
|
} NodeConnectionEntry;
|
||||||
|
|
||||||
|
|
||||||
|
/* state needed to prevent new connections during modifying transactions */
|
||||||
|
extern bool IsModifyingTransaction;
|
||||||
|
|
||||||
|
|
||||||
/* function declarations for obtaining and using a connection */
|
/* function declarations for obtaining and using a connection */
|
||||||
extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort);
|
extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort);
|
||||||
extern void PurgeConnection(PGconn *connection);
|
extern void PurgeConnection(PGconn *connection);
|
||||||
|
extern PGconn * PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey);
|
||||||
extern bool SqlStateMatchesCategory(char *sqlStateString, int category);
|
extern bool SqlStateMatchesCategory(char *sqlStateString, int category);
|
||||||
extern void WarnRemoteError(PGconn *connection, PGresult *result);
|
extern void WarnRemoteError(PGconn *connection, PGresult *result);
|
||||||
extern void ReraiseRemoteError(PGconn *connection, PGresult *result);
|
extern void ReraiseRemoteError(PGconn *connection, PGresult *result);
|
||||||
|
|
|
@ -14,6 +14,9 @@
|
||||||
#ifndef MULTI_PHYSICAL_PLANNER_H
|
#ifndef MULTI_PHYSICAL_PLANNER_H
|
||||||
#define MULTI_PHYSICAL_PLANNER_H
|
#define MULTI_PHYSICAL_PLANNER_H
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
#include "c.h"
|
||||||
|
|
||||||
#include "datatype/timestamp.h"
|
#include "datatype/timestamp.h"
|
||||||
#include "distributed/citus_nodes.h"
|
#include "distributed/citus_nodes.h"
|
||||||
#include "distributed/master_metadata_utility.h"
|
#include "distributed/master_metadata_utility.h"
|
||||||
|
|
|
@ -9,9 +9,27 @@
|
||||||
#ifndef MULTI_ROUTER_EXECUTOR_H_
|
#ifndef MULTI_ROUTER_EXECUTOR_H_
|
||||||
#define MULTI_ROUTER_EXECUTOR_H_
|
#define MULTI_ROUTER_EXECUTOR_H_
|
||||||
|
|
||||||
|
#include "c.h"
|
||||||
|
|
||||||
|
#include "access/sdir.h"
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
#include "executor/execdesc.h"
|
#include "executor/execdesc.h"
|
||||||
|
#include "nodes/pg_list.h"
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* XactShardConnSet keeps track of the mapping from shard to the set of nodes
|
||||||
|
* involved in multi-statement transaction-wrapped modifications of that shard.
|
||||||
|
* This information is used to mark placements inactive at transaction close.
|
||||||
|
*/
|
||||||
|
typedef struct XactShardConnSet
|
||||||
|
{
|
||||||
|
uint64 shardId; /* identifier of the shard that was modified */
|
||||||
|
List *connectionEntryList; /* NodeConnectionEntry pointers to participating nodes */
|
||||||
|
} XactShardConnSet;
|
||||||
|
|
||||||
|
|
||||||
|
/* Config variables managed via guc.c */
|
||||||
extern bool AllModificationsCommutative;
|
extern bool AllModificationsCommutative;
|
||||||
|
|
||||||
|
|
||||||
|
@ -19,5 +37,6 @@ extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task);
|
||||||
extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count);
|
extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count);
|
||||||
extern void RouterExecutorFinish(QueryDesc *queryDesc);
|
extern void RouterExecutorFinish(QueryDesc *queryDesc);
|
||||||
extern void RouterExecutorEnd(QueryDesc *queryDesc);
|
extern void RouterExecutorEnd(QueryDesc *queryDesc);
|
||||||
|
extern void InstallRouterExecutorShmemHook(void);
|
||||||
|
|
||||||
#endif /* MULTI_ROUTER_EXECUTOR_H_ */
|
#endif /* MULTI_ROUTER_EXECUTOR_H_ */
|
||||||
|
|
|
@ -0,0 +1,507 @@
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1200000;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1200000;
|
||||||
|
-- ===================================================================
|
||||||
|
-- test end-to-end modification functionality
|
||||||
|
-- ===================================================================
|
||||||
|
CREATE TABLE researchers (
|
||||||
|
id bigint NOT NULL,
|
||||||
|
lab_id int NOT NULL,
|
||||||
|
name text NOT NULL
|
||||||
|
);
|
||||||
|
CREATE TABLE labs (
|
||||||
|
id bigint NOT NULL,
|
||||||
|
name text NOT NULL
|
||||||
|
);
|
||||||
|
SELECT master_create_distributed_table('researchers', 'lab_id', 'hash');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT master_create_worker_shards('researchers', 2, 2);
|
||||||
|
master_create_worker_shards
|
||||||
|
-----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT master_create_distributed_table('labs', 'id', 'hash');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT master_create_worker_shards('labs', 1, 1);
|
||||||
|
master_create_worker_shards
|
||||||
|
-----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- add some data
|
||||||
|
INSERT INTO researchers VALUES (1, 1, 'Donald Knuth');
|
||||||
|
INSERT INTO researchers VALUES (2, 1, 'Niklaus Wirth');
|
||||||
|
INSERT INTO researchers VALUES (3, 2, 'Tony Hoare');
|
||||||
|
INSERT INTO researchers VALUES (4, 2, 'Kenneth Iverson');
|
||||||
|
-- replace a researcher, reusing their id
|
||||||
|
BEGIN;
|
||||||
|
DELETE FROM researchers WHERE lab_id = 1 AND id = 2;
|
||||||
|
INSERT INTO researchers VALUES (2, 1, 'John Backus');
|
||||||
|
COMMIT;
|
||||||
|
SELECT name FROM researchers WHERE lab_id = 1 AND id = 2;
|
||||||
|
name
|
||||||
|
-------------
|
||||||
|
John Backus
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- abort a modification
|
||||||
|
BEGIN;
|
||||||
|
DELETE FROM researchers WHERE lab_id = 1 AND id = 1;
|
||||||
|
ABORT;
|
||||||
|
SELECT name FROM researchers WHERE lab_id = 1 AND id = 1;
|
||||||
|
name
|
||||||
|
--------------
|
||||||
|
Donald Knuth
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- creating savepoints should work...
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO researchers VALUES (5, 3, 'Dennis Ritchie');
|
||||||
|
SAVEPOINT hire_thompson;
|
||||||
|
INSERT INTO researchers VALUES (6, 3, 'Ken Thompson');
|
||||||
|
COMMIT;
|
||||||
|
SELECT name FROM researchers WHERE lab_id = 3 AND id = 6;
|
||||||
|
name
|
||||||
|
--------------
|
||||||
|
Ken Thompson
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- even if created by PL/pgSQL...
|
||||||
|
BEGIN;
|
||||||
|
DO $$
|
||||||
|
BEGIN
|
||||||
|
INSERT INTO researchers VALUES (10, 10, 'Edsger Dijkstra');
|
||||||
|
EXCEPTION
|
||||||
|
WHEN not_null_violation THEN
|
||||||
|
RAISE NOTICE 'caught not_null_violation';
|
||||||
|
END $$;
|
||||||
|
COMMIT;
|
||||||
|
-- but rollback should not
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO researchers VALUES (7, 4, 'Jim Gray');
|
||||||
|
SAVEPOINT hire_engelbart;
|
||||||
|
INSERT INTO researchers VALUES (8, 4, 'Douglas Engelbart');
|
||||||
|
ROLLBACK TO hire_engelbart;
|
||||||
|
COMMIT;
|
||||||
|
ERROR: cannot ROLLBACK TO SAVEPOINT in transactions which modify distributed tables
|
||||||
|
SELECT name FROM researchers WHERE lab_id = 4;
|
||||||
|
name
|
||||||
|
------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
DO $$
|
||||||
|
BEGIN
|
||||||
|
INSERT INTO researchers VALUES (NULL, 10, 'Edsger Dijkstra');
|
||||||
|
EXCEPTION
|
||||||
|
WHEN not_null_violation THEN
|
||||||
|
RAISE NOTICE 'caught not_null_violation';
|
||||||
|
END $$;
|
||||||
|
NOTICE: caught not_null_violation
|
||||||
|
COMMIT;
|
||||||
|
ERROR: cannot ROLLBACK TO SAVEPOINT in transactions which modify distributed tables
|
||||||
|
-- should be valid to edit labs after researchers...
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO researchers VALUES (8, 5, 'Douglas Engelbart');
|
||||||
|
INSERT INTO labs VALUES (5, 'Los Alamos');
|
||||||
|
COMMIT;
|
||||||
|
SELECT * FROM researchers, labs WHERE labs.id = researchers.lab_id;
|
||||||
|
id | lab_id | name | id | name
|
||||||
|
----+--------+-------------------+----+------------
|
||||||
|
8 | 5 | Douglas Engelbart | 5 | Los Alamos
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- but not the other way around (would require expanding xact participants)...
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
|
INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport');
|
||||||
|
ERROR: no transaction participant matches localhost:57638
|
||||||
|
DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction.
|
||||||
|
COMMIT;
|
||||||
|
-- this logic even applies to router SELECTs occurring after a modification:
|
||||||
|
-- selecting from the modified node is fine...
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
|
SELECT count(*) FROM researchers WHERE lab_id = 6;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ABORT;
|
||||||
|
-- but if a SELECT needs to go to new node, that's a problem...
|
||||||
|
BEGIN;
|
||||||
|
UPDATE pg_dist_shard_placement AS sp SET shardstate = 3
|
||||||
|
FROM pg_dist_shard AS s
|
||||||
|
WHERE sp.shardid = s.shardid
|
||||||
|
AND sp.nodename = 'localhost'
|
||||||
|
AND sp.nodeport = :worker_1_port
|
||||||
|
AND s.logicalrelid = 'researchers'::regclass;
|
||||||
|
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
|
SELECT count(*) FROM researchers WHERE lab_id = 6;
|
||||||
|
ERROR: no transaction participant matches localhost:57638
|
||||||
|
DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction.
|
||||||
|
ABORT;
|
||||||
|
-- applies to DDL or COPY, too
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
|
ALTER TABLE labs ADD COLUMN text motto;
|
||||||
|
ERROR: distributed DDL commands cannot run inside a transaction block
|
||||||
|
COMMIT;
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
|
\copy labs from stdin delimiter ','
|
||||||
|
ERROR: cannot open new connections after the first modification command within a transaction
|
||||||
|
CONTEXT: COPY labs, line 1: "10,Weyland-Yutani"
|
||||||
|
COMMIT;
|
||||||
|
-- though the copy will work if before any modifications
|
||||||
|
BEGIN;
|
||||||
|
\copy labs from stdin delimiter ','
|
||||||
|
SELECT name FROM labs WHERE id = 10;
|
||||||
|
name
|
||||||
|
----------------
|
||||||
|
Weyland-Yutani
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
|
COMMIT;
|
||||||
|
-- now, for some special failures...
|
||||||
|
CREATE TABLE objects (
|
||||||
|
id bigint PRIMARY KEY,
|
||||||
|
name text NOT NULL
|
||||||
|
);
|
||||||
|
SELECT master_create_distributed_table('objects', 'id', 'hash');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT master_create_worker_shards('objects', 1, 2);
|
||||||
|
master_create_worker_shards
|
||||||
|
-----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- test primary key violations
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO objects VALUES (1, 'apple');
|
||||||
|
INSERT INTO objects VALUES (1, 'orange');
|
||||||
|
ERROR: duplicate key value violates unique constraint "objects_pkey_1200003"
|
||||||
|
DETAIL: Key (id)=(1) already exists.
|
||||||
|
CONTEXT: while executing command on localhost:57637
|
||||||
|
COMMIT;
|
||||||
|
-- data shouldn't have persisted...
|
||||||
|
SELECT * FROM objects WHERE id = 1;
|
||||||
|
id | name
|
||||||
|
----+------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- and placements should still be healthy...
|
||||||
|
SELECT count(*)
|
||||||
|
FROM pg_dist_shard_placement AS sp,
|
||||||
|
pg_dist_shard AS s
|
||||||
|
WHERE sp.shardid = s.shardid
|
||||||
|
AND sp.shardstate = 1
|
||||||
|
AND s.logicalrelid = 'objects'::regclass;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
2
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- create trigger on one worker to reject certain values
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
CREATE FUNCTION reject_bad() RETURNS trigger AS $rb$
|
||||||
|
BEGIN
|
||||||
|
IF (NEW.name = 'BAD') THEN
|
||||||
|
RAISE 'illegal value';
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
RETURN NEW;
|
||||||
|
END;
|
||||||
|
$rb$ LANGUAGE plpgsql;
|
||||||
|
CREATE CONSTRAINT TRIGGER reject_bad
|
||||||
|
AFTER INSERT ON objects_1200003
|
||||||
|
DEFERRABLE INITIALLY IMMEDIATE
|
||||||
|
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
|
||||||
|
\c - - - :master_port
|
||||||
|
-- test partial failure; worker_1 succeeds, 2 fails
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO objects VALUES (1, 'apple');
|
||||||
|
INSERT INTO objects VALUES (2, 'BAD');
|
||||||
|
WARNING: illegal value
|
||||||
|
CONTEXT: while executing command on localhost:57638
|
||||||
|
INSERT INTO labs VALUES (7, 'E Corp');
|
||||||
|
COMMIT;
|
||||||
|
-- data should be persisted
|
||||||
|
SELECT * FROM objects WHERE id = 2;
|
||||||
|
id | name
|
||||||
|
----+------
|
||||||
|
2 | BAD
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM labs WHERE id = 7;
|
||||||
|
id | name
|
||||||
|
----+--------
|
||||||
|
7 | E Corp
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- but one placement should be bad
|
||||||
|
SELECT count(*)
|
||||||
|
FROM pg_dist_shard_placement AS sp,
|
||||||
|
pg_dist_shard AS s
|
||||||
|
WHERE sp.shardid = s.shardid
|
||||||
|
AND sp.nodename = 'localhost'
|
||||||
|
AND sp.nodeport = :worker_2_port
|
||||||
|
AND sp.shardstate = 3
|
||||||
|
AND s.logicalrelid = 'objects'::regclass;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DELETE FROM objects;
|
||||||
|
-- mark shards as healthy again; delete all data
|
||||||
|
UPDATE pg_dist_shard_placement AS sp SET shardstate = 1
|
||||||
|
FROM pg_dist_shard AS s
|
||||||
|
WHERE sp.shardid = s.shardid
|
||||||
|
AND s.logicalrelid = 'objects'::regclass;
|
||||||
|
-- what if there are errors on different shards at different times?
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
CREATE FUNCTION reject_bad() RETURNS trigger AS $rb$
|
||||||
|
BEGIN
|
||||||
|
IF (NEW.name = 'BAD') THEN
|
||||||
|
RAISE 'illegal value';
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
RETURN NEW;
|
||||||
|
END;
|
||||||
|
$rb$ LANGUAGE plpgsql;
|
||||||
|
CREATE CONSTRAINT TRIGGER reject_bad
|
||||||
|
AFTER INSERT ON labs_1200002
|
||||||
|
DEFERRABLE INITIALLY IMMEDIATE
|
||||||
|
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
|
||||||
|
\c - - - :master_port
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO objects VALUES (1, 'apple');
|
||||||
|
INSERT INTO objects VALUES (2, 'BAD');
|
||||||
|
WARNING: illegal value
|
||||||
|
CONTEXT: while executing command on localhost:57638
|
||||||
|
INSERT INTO labs VALUES (8, 'Aperture Science');
|
||||||
|
INSERT INTO labs VALUES (9, 'BAD');
|
||||||
|
WARNING: illegal value
|
||||||
|
CONTEXT: while executing command on localhost:57637
|
||||||
|
ERROR: could not modify any active placements
|
||||||
|
COMMIT;
|
||||||
|
-- data should NOT be persisted
|
||||||
|
SELECT * FROM objects WHERE id = 1;
|
||||||
|
id | name
|
||||||
|
----+------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
SELECT * FROM labs WHERE id = 8;
|
||||||
|
id | name
|
||||||
|
----+------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- all placements should remain healthy
|
||||||
|
SELECT count(*)
|
||||||
|
FROM pg_dist_shard_placement AS sp,
|
||||||
|
pg_dist_shard AS s
|
||||||
|
WHERE sp.shardid = s.shardid
|
||||||
|
AND sp.shardstate = 1
|
||||||
|
AND (s.logicalrelid = 'objects'::regclass OR
|
||||||
|
s.logicalrelid = 'labs'::regclass);
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- what if the failures happen at COMMIT time?
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
DROP TRIGGER reject_bad ON objects_1200003;
|
||||||
|
CREATE CONSTRAINT TRIGGER reject_bad
|
||||||
|
AFTER INSERT ON objects_1200003
|
||||||
|
DEFERRABLE INITIALLY DEFERRED
|
||||||
|
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
|
||||||
|
\c - - - :master_port
|
||||||
|
-- should be the same story as before, just at COMMIT time
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO objects VALUES (1, 'apple');
|
||||||
|
INSERT INTO objects VALUES (2, 'BAD');
|
||||||
|
INSERT INTO labs VALUES (9, 'Umbrella Corporation');
|
||||||
|
COMMIT;
|
||||||
|
WARNING: illegal value
|
||||||
|
CONTEXT: while executing command on localhost:57638
|
||||||
|
-- data should be persisted
|
||||||
|
SELECT * FROM objects WHERE id = 2;
|
||||||
|
id | name
|
||||||
|
----+------
|
||||||
|
2 | BAD
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM labs WHERE id = 7;
|
||||||
|
id | name
|
||||||
|
----+--------
|
||||||
|
7 | E Corp
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- but one placement should be bad
|
||||||
|
SELECT count(*)
|
||||||
|
FROM pg_dist_shard_placement AS sp,
|
||||||
|
pg_dist_shard AS s
|
||||||
|
WHERE sp.shardid = s.shardid
|
||||||
|
AND sp.nodename = 'localhost'
|
||||||
|
AND sp.nodeport = :worker_2_port
|
||||||
|
AND sp.shardstate = 3
|
||||||
|
AND s.logicalrelid = 'objects'::regclass;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DELETE FROM objects;
|
||||||
|
-- mark shards as healthy again; delete all data
|
||||||
|
UPDATE pg_dist_shard_placement AS sp SET shardstate = 1
|
||||||
|
FROM pg_dist_shard AS s
|
||||||
|
WHERE sp.shardid = s.shardid
|
||||||
|
AND s.logicalrelid = 'objects'::regclass;
|
||||||
|
-- what if all nodes have failures at COMMIT time?
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
DROP TRIGGER reject_bad ON labs_1200002;
|
||||||
|
CREATE CONSTRAINT TRIGGER reject_bad
|
||||||
|
AFTER INSERT ON labs_1200002
|
||||||
|
DEFERRABLE INITIALLY DEFERRED
|
||||||
|
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
|
||||||
|
\c - - - :master_port
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO objects VALUES (1, 'apple');
|
||||||
|
INSERT INTO objects VALUES (2, 'BAD');
|
||||||
|
INSERT INTO labs VALUES (8, 'Aperture Science');
|
||||||
|
INSERT INTO labs VALUES (9, 'BAD');
|
||||||
|
COMMIT;
|
||||||
|
WARNING: illegal value
|
||||||
|
CONTEXT: while executing command on localhost:57638
|
||||||
|
WARNING: illegal value
|
||||||
|
CONTEXT: while executing command on localhost:57637
|
||||||
|
ERROR: could not commit transaction on any active nodes
|
||||||
|
-- data should NOT be persisted
|
||||||
|
SELECT * FROM objects WHERE id = 1;
|
||||||
|
id | name
|
||||||
|
----+------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
SELECT * FROM labs WHERE id = 8;
|
||||||
|
id | name
|
||||||
|
----+------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- all placements should remain healthy
|
||||||
|
SELECT count(*)
|
||||||
|
FROM pg_dist_shard_placement AS sp,
|
||||||
|
pg_dist_shard AS s
|
||||||
|
WHERE sp.shardid = s.shardid
|
||||||
|
AND sp.shardstate = 1
|
||||||
|
AND (s.logicalrelid = 'objects'::regclass OR
|
||||||
|
s.logicalrelid = 'labs'::regclass);
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- what if one shard (objects) succeeds but another (labs) completely fails?
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
DROP TRIGGER reject_bad ON objects_1200003;
|
||||||
|
\c - - - :master_port
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO objects VALUES (1, 'apple');
|
||||||
|
INSERT INTO labs VALUES (8, 'Aperture Science');
|
||||||
|
INSERT INTO labs VALUES (9, 'BAD');
|
||||||
|
COMMIT;
|
||||||
|
WARNING: illegal value
|
||||||
|
CONTEXT: while executing command on localhost:57637
|
||||||
|
-- data to objects should be persisted, but labs should not...
|
||||||
|
SELECT * FROM objects WHERE id = 1;
|
||||||
|
id | name
|
||||||
|
----+-------
|
||||||
|
1 | apple
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM labs WHERE id = 8;
|
||||||
|
id | name
|
||||||
|
----+------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- labs should be healthy, but one object placement shouldn't be
|
||||||
|
SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*)
|
||||||
|
FROM pg_dist_shard_placement AS sp,
|
||||||
|
pg_dist_shard AS s
|
||||||
|
WHERE sp.shardid = s.shardid
|
||||||
|
AND (s.logicalrelid = 'objects'::regclass OR
|
||||||
|
s.logicalrelid = 'labs'::regclass)
|
||||||
|
GROUP BY s.logicalrelid, sp.shardstate
|
||||||
|
ORDER BY s.logicalrelid, sp.shardstate;
|
||||||
|
logicalrelid | shardstate | count
|
||||||
|
--------------+------------+-------
|
||||||
|
labs | 1 | 1
|
||||||
|
objects | 1 | 1
|
||||||
|
objects | 3 | 1
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
-- some append-partitioned tests for good measure
|
||||||
|
CREATE TABLE append_researchers ( LIKE researchers );
|
||||||
|
SELECT master_create_distributed_table('append_researchers', 'id', 'append');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SELECT master_create_empty_shard('append_researchers') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 0, shardmaxvalue = 500000
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
SELECT master_create_empty_shard('append_researchers') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 500000, shardmaxvalue = 1000000
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
SET citus.shard_replication_factor TO DEFAULT;
|
||||||
|
-- try single-shard INSERT
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO append_researchers VALUES (0, 0, 'John Backus');
|
||||||
|
COMMIT;
|
||||||
|
SELECT * FROM append_researchers WHERE id = 0;
|
||||||
|
id | lab_id | name
|
||||||
|
----+--------+-------------
|
||||||
|
0 | 0 | John Backus
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- try rollback
|
||||||
|
BEGIN;
|
||||||
|
DELETE FROM append_researchers WHERE id = 0;
|
||||||
|
ROLLBACK;
|
||||||
|
SELECT * FROM append_researchers WHERE id = 0;
|
||||||
|
id | lab_id | name
|
||||||
|
----+--------+-------------
|
||||||
|
0 | 0 | John Backus
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- try hitting shard on other node
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO append_researchers VALUES (1, 1, 'John McCarthy');
|
||||||
|
INSERT INTO append_researchers VALUES (500000, 500000, 'Tony Hoare');
|
||||||
|
ERROR: distributed modifications must target exactly one shard
|
||||||
|
ROLLBACK;
|
||||||
|
SELECT * FROM append_researchers;
|
||||||
|
id | lab_id | name
|
||||||
|
----+--------+-------------
|
||||||
|
0 | 0 | John Backus
|
||||||
|
(1 row)
|
||||||
|
|
|
@ -129,6 +129,7 @@ test: multi_utilities
|
||||||
test: multi_create_insert_proxy
|
test: multi_create_insert_proxy
|
||||||
test: multi_data_types
|
test: multi_data_types
|
||||||
test: multi_repartitioned_subquery_udf
|
test: multi_repartitioned_subquery_udf
|
||||||
|
test: multi_modifying_xacts
|
||||||
|
|
||||||
# ---------
|
# ---------
|
||||||
# multi_copy creates hash and range-partitioned tables and performs COPY
|
# multi_copy creates hash and range-partitioned tables and performs COPY
|
||||||
|
|
|
@ -0,0 +1,400 @@
|
||||||
|
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1200000;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1200000;
|
||||||
|
|
||||||
|
|
||||||
|
-- ===================================================================
|
||||||
|
-- test end-to-end modification functionality
|
||||||
|
-- ===================================================================
|
||||||
|
|
||||||
|
CREATE TABLE researchers (
|
||||||
|
id bigint NOT NULL,
|
||||||
|
lab_id int NOT NULL,
|
||||||
|
name text NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE labs (
|
||||||
|
id bigint NOT NULL,
|
||||||
|
name text NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
SELECT master_create_distributed_table('researchers', 'lab_id', 'hash');
|
||||||
|
SELECT master_create_worker_shards('researchers', 2, 2);
|
||||||
|
|
||||||
|
SELECT master_create_distributed_table('labs', 'id', 'hash');
|
||||||
|
SELECT master_create_worker_shards('labs', 1, 1);
|
||||||
|
|
||||||
|
-- add some data
|
||||||
|
INSERT INTO researchers VALUES (1, 1, 'Donald Knuth');
|
||||||
|
INSERT INTO researchers VALUES (2, 1, 'Niklaus Wirth');
|
||||||
|
INSERT INTO researchers VALUES (3, 2, 'Tony Hoare');
|
||||||
|
INSERT INTO researchers VALUES (4, 2, 'Kenneth Iverson');
|
||||||
|
|
||||||
|
-- replace a researcher, reusing their id
|
||||||
|
BEGIN;
|
||||||
|
DELETE FROM researchers WHERE lab_id = 1 AND id = 2;
|
||||||
|
INSERT INTO researchers VALUES (2, 1, 'John Backus');
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
SELECT name FROM researchers WHERE lab_id = 1 AND id = 2;
|
||||||
|
|
||||||
|
-- abort a modification
|
||||||
|
BEGIN;
|
||||||
|
DELETE FROM researchers WHERE lab_id = 1 AND id = 1;
|
||||||
|
ABORT;
|
||||||
|
|
||||||
|
SELECT name FROM researchers WHERE lab_id = 1 AND id = 1;
|
||||||
|
|
||||||
|
-- creating savepoints should work...
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO researchers VALUES (5, 3, 'Dennis Ritchie');
|
||||||
|
SAVEPOINT hire_thompson;
|
||||||
|
INSERT INTO researchers VALUES (6, 3, 'Ken Thompson');
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
SELECT name FROM researchers WHERE lab_id = 3 AND id = 6;
|
||||||
|
|
||||||
|
-- even if created by PL/pgSQL...
|
||||||
|
BEGIN;
|
||||||
|
DO $$
|
||||||
|
BEGIN
|
||||||
|
INSERT INTO researchers VALUES (10, 10, 'Edsger Dijkstra');
|
||||||
|
EXCEPTION
|
||||||
|
WHEN not_null_violation THEN
|
||||||
|
RAISE NOTICE 'caught not_null_violation';
|
||||||
|
END $$;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- but rollback should not
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO researchers VALUES (7, 4, 'Jim Gray');
|
||||||
|
SAVEPOINT hire_engelbart;
|
||||||
|
INSERT INTO researchers VALUES (8, 4, 'Douglas Engelbart');
|
||||||
|
ROLLBACK TO hire_engelbart;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
SELECT name FROM researchers WHERE lab_id = 4;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
DO $$
|
||||||
|
BEGIN
|
||||||
|
INSERT INTO researchers VALUES (NULL, 10, 'Edsger Dijkstra');
|
||||||
|
EXCEPTION
|
||||||
|
WHEN not_null_violation THEN
|
||||||
|
RAISE NOTICE 'caught not_null_violation';
|
||||||
|
END $$;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
-- should be valid to edit labs after researchers...
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO researchers VALUES (8, 5, 'Douglas Engelbart');
|
||||||
|
INSERT INTO labs VALUES (5, 'Los Alamos');
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
SELECT * FROM researchers, labs WHERE labs.id = researchers.lab_id;
|
||||||
|
|
||||||
|
-- but not the other way around (would require expanding xact participants)...
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
|
INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport');
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- this logic even applies to router SELECTs occurring after a modification:
|
||||||
|
-- selecting from the modified node is fine...
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
|
SELECT count(*) FROM researchers WHERE lab_id = 6;
|
||||||
|
ABORT;
|
||||||
|
|
||||||
|
-- but if a SELECT needs to go to new node, that's a problem...
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
UPDATE pg_dist_shard_placement AS sp SET shardstate = 3
|
||||||
|
FROM pg_dist_shard AS s
|
||||||
|
WHERE sp.shardid = s.shardid
|
||||||
|
AND sp.nodename = 'localhost'
|
||||||
|
AND sp.nodeport = :worker_1_port
|
||||||
|
AND s.logicalrelid = 'researchers'::regclass;
|
||||||
|
|
||||||
|
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
|
SELECT count(*) FROM researchers WHERE lab_id = 6;
|
||||||
|
ABORT;
|
||||||
|
|
||||||
|
-- applies to DDL or COPY, too
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
|
ALTER TABLE labs ADD COLUMN text motto;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
|
\copy labs from stdin delimiter ','
|
||||||
|
10,Weyland-Yutani
|
||||||
|
\.
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- though the copy will work if before any modifications
|
||||||
|
BEGIN;
|
||||||
|
\copy labs from stdin delimiter ','
|
||||||
|
10,Weyland-Yutani
|
||||||
|
\.
|
||||||
|
SELECT name FROM labs WHERE id = 10;
|
||||||
|
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- now, for some special failures...
|
||||||
|
CREATE TABLE objects (
|
||||||
|
id bigint PRIMARY KEY,
|
||||||
|
name text NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
SELECT master_create_distributed_table('objects', 'id', 'hash');
|
||||||
|
SELECT master_create_worker_shards('objects', 1, 2);
|
||||||
|
|
||||||
|
-- test primary key violations
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO objects VALUES (1, 'apple');
|
||||||
|
INSERT INTO objects VALUES (1, 'orange');
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- data shouldn't have persisted...
|
||||||
|
SELECT * FROM objects WHERE id = 1;
|
||||||
|
|
||||||
|
-- and placements should still be healthy...
|
||||||
|
SELECT count(*)
|
||||||
|
FROM pg_dist_shard_placement AS sp,
|
||||||
|
pg_dist_shard AS s
|
||||||
|
WHERE sp.shardid = s.shardid
|
||||||
|
AND sp.shardstate = 1
|
||||||
|
AND s.logicalrelid = 'objects'::regclass;
|
||||||
|
|
||||||
|
-- create trigger on one worker to reject certain values
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
|
||||||
|
CREATE FUNCTION reject_bad() RETURNS trigger AS $rb$
|
||||||
|
BEGIN
|
||||||
|
IF (NEW.name = 'BAD') THEN
|
||||||
|
RAISE 'illegal value';
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
RETURN NEW;
|
||||||
|
END;
|
||||||
|
$rb$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
CREATE CONSTRAINT TRIGGER reject_bad
|
||||||
|
AFTER INSERT ON objects_1200003
|
||||||
|
DEFERRABLE INITIALLY IMMEDIATE
|
||||||
|
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
|
||||||
|
-- test partial failure; worker_1 succeeds, 2 fails
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO objects VALUES (1, 'apple');
|
||||||
|
INSERT INTO objects VALUES (2, 'BAD');
|
||||||
|
INSERT INTO labs VALUES (7, 'E Corp');
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- data should be persisted
|
||||||
|
SELECT * FROM objects WHERE id = 2;
|
||||||
|
SELECT * FROM labs WHERE id = 7;
|
||||||
|
|
||||||
|
-- but one placement should be bad
|
||||||
|
SELECT count(*)
|
||||||
|
FROM pg_dist_shard_placement AS sp,
|
||||||
|
pg_dist_shard AS s
|
||||||
|
WHERE sp.shardid = s.shardid
|
||||||
|
AND sp.nodename = 'localhost'
|
||||||
|
AND sp.nodeport = :worker_2_port
|
||||||
|
AND sp.shardstate = 3
|
||||||
|
AND s.logicalrelid = 'objects'::regclass;
|
||||||
|
|
||||||
|
DELETE FROM objects;
|
||||||
|
|
||||||
|
-- mark shards as healthy again; delete all data
|
||||||
|
UPDATE pg_dist_shard_placement AS sp SET shardstate = 1
|
||||||
|
FROM pg_dist_shard AS s
|
||||||
|
WHERE sp.shardid = s.shardid
|
||||||
|
AND s.logicalrelid = 'objects'::regclass;
|
||||||
|
|
||||||
|
-- what if there are errors on different shards at different times?
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
CREATE FUNCTION reject_bad() RETURNS trigger AS $rb$
|
||||||
|
BEGIN
|
||||||
|
IF (NEW.name = 'BAD') THEN
|
||||||
|
RAISE 'illegal value';
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
RETURN NEW;
|
||||||
|
END;
|
||||||
|
$rb$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
CREATE CONSTRAINT TRIGGER reject_bad
|
||||||
|
AFTER INSERT ON labs_1200002
|
||||||
|
DEFERRABLE INITIALLY IMMEDIATE
|
||||||
|
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO objects VALUES (1, 'apple');
|
||||||
|
INSERT INTO objects VALUES (2, 'BAD');
|
||||||
|
INSERT INTO labs VALUES (8, 'Aperture Science');
|
||||||
|
INSERT INTO labs VALUES (9, 'BAD');
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- data should NOT be persisted
|
||||||
|
SELECT * FROM objects WHERE id = 1;
|
||||||
|
SELECT * FROM labs WHERE id = 8;
|
||||||
|
|
||||||
|
-- all placements should remain healthy
|
||||||
|
SELECT count(*)
|
||||||
|
FROM pg_dist_shard_placement AS sp,
|
||||||
|
pg_dist_shard AS s
|
||||||
|
WHERE sp.shardid = s.shardid
|
||||||
|
AND sp.shardstate = 1
|
||||||
|
AND (s.logicalrelid = 'objects'::regclass OR
|
||||||
|
s.logicalrelid = 'labs'::regclass);
|
||||||
|
|
||||||
|
-- what if the failures happen at COMMIT time?
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
|
||||||
|
DROP TRIGGER reject_bad ON objects_1200003;
|
||||||
|
|
||||||
|
CREATE CONSTRAINT TRIGGER reject_bad
|
||||||
|
AFTER INSERT ON objects_1200003
|
||||||
|
DEFERRABLE INITIALLY DEFERRED
|
||||||
|
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
|
||||||
|
-- should be the same story as before, just at COMMIT time
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO objects VALUES (1, 'apple');
|
||||||
|
INSERT INTO objects VALUES (2, 'BAD');
|
||||||
|
INSERT INTO labs VALUES (9, 'Umbrella Corporation');
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- data should be persisted
|
||||||
|
SELECT * FROM objects WHERE id = 2;
|
||||||
|
SELECT * FROM labs WHERE id = 7;
|
||||||
|
|
||||||
|
-- but one placement should be bad
|
||||||
|
SELECT count(*)
|
||||||
|
FROM pg_dist_shard_placement AS sp,
|
||||||
|
pg_dist_shard AS s
|
||||||
|
WHERE sp.shardid = s.shardid
|
||||||
|
AND sp.nodename = 'localhost'
|
||||||
|
AND sp.nodeport = :worker_2_port
|
||||||
|
AND sp.shardstate = 3
|
||||||
|
AND s.logicalrelid = 'objects'::regclass;
|
||||||
|
|
||||||
|
DELETE FROM objects;
|
||||||
|
|
||||||
|
-- mark shards as healthy again; delete all data
|
||||||
|
UPDATE pg_dist_shard_placement AS sp SET shardstate = 1
|
||||||
|
FROM pg_dist_shard AS s
|
||||||
|
WHERE sp.shardid = s.shardid
|
||||||
|
AND s.logicalrelid = 'objects'::regclass;
|
||||||
|
|
||||||
|
-- what if all nodes have failures at COMMIT time?
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
|
||||||
|
DROP TRIGGER reject_bad ON labs_1200002;
|
||||||
|
|
||||||
|
CREATE CONSTRAINT TRIGGER reject_bad
|
||||||
|
AFTER INSERT ON labs_1200002
|
||||||
|
DEFERRABLE INITIALLY DEFERRED
|
||||||
|
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO objects VALUES (1, 'apple');
|
||||||
|
INSERT INTO objects VALUES (2, 'BAD');
|
||||||
|
INSERT INTO labs VALUES (8, 'Aperture Science');
|
||||||
|
INSERT INTO labs VALUES (9, 'BAD');
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- data should NOT be persisted
|
||||||
|
SELECT * FROM objects WHERE id = 1;
|
||||||
|
SELECT * FROM labs WHERE id = 8;
|
||||||
|
|
||||||
|
-- all placements should remain healthy
|
||||||
|
SELECT count(*)
|
||||||
|
FROM pg_dist_shard_placement AS sp,
|
||||||
|
pg_dist_shard AS s
|
||||||
|
WHERE sp.shardid = s.shardid
|
||||||
|
AND sp.shardstate = 1
|
||||||
|
AND (s.logicalrelid = 'objects'::regclass OR
|
||||||
|
s.logicalrelid = 'labs'::regclass);
|
||||||
|
|
||||||
|
-- what if one shard (objects) succeeds but another (labs) completely fails?
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
|
||||||
|
DROP TRIGGER reject_bad ON objects_1200003;
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO objects VALUES (1, 'apple');
|
||||||
|
INSERT INTO labs VALUES (8, 'Aperture Science');
|
||||||
|
INSERT INTO labs VALUES (9, 'BAD');
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- data to objects should be persisted, but labs should not...
|
||||||
|
SELECT * FROM objects WHERE id = 1;
|
||||||
|
SELECT * FROM labs WHERE id = 8;
|
||||||
|
|
||||||
|
-- labs should be healthy, but one object placement shouldn't be
|
||||||
|
SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*)
|
||||||
|
FROM pg_dist_shard_placement AS sp,
|
||||||
|
pg_dist_shard AS s
|
||||||
|
WHERE sp.shardid = s.shardid
|
||||||
|
AND (s.logicalrelid = 'objects'::regclass OR
|
||||||
|
s.logicalrelid = 'labs'::regclass)
|
||||||
|
GROUP BY s.logicalrelid, sp.shardstate
|
||||||
|
ORDER BY s.logicalrelid, sp.shardstate;
|
||||||
|
|
||||||
|
-- some append-partitioned tests for good measure
|
||||||
|
CREATE TABLE append_researchers ( LIKE researchers );
|
||||||
|
|
||||||
|
SELECT master_create_distributed_table('append_researchers', 'id', 'append');
|
||||||
|
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('append_researchers') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 0, shardmaxvalue = 500000
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('append_researchers') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 500000, shardmaxvalue = 1000000
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SET citus.shard_replication_factor TO DEFAULT;
|
||||||
|
|
||||||
|
-- try single-shard INSERT
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO append_researchers VALUES (0, 0, 'John Backus');
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
SELECT * FROM append_researchers WHERE id = 0;
|
||||||
|
|
||||||
|
-- try rollback
|
||||||
|
BEGIN;
|
||||||
|
DELETE FROM append_researchers WHERE id = 0;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
SELECT * FROM append_researchers WHERE id = 0;
|
||||||
|
|
||||||
|
-- try hitting shard on other node
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO append_researchers VALUES (1, 1, 'John McCarthy');
|
||||||
|
INSERT INTO append_researchers VALUES (500000, 500000, 'Tony Hoare');
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
SELECT * FROM append_researchers;
|
Loading…
Reference in New Issue