diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index c51c4d79b..83c700133 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -94,6 +94,13 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba char *effectiveDatabaseName = NULL; char *effectiveUserName = NULL; + if (IsModifyingTransaction) + { + ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot open new connections after the first modification " + "command within a transaction"))); + } + if (connectionId == INVALID_CONNECTION_ID) { ereport(WARNING, (errmsg("could not allocate connection in connection pool"))); @@ -174,6 +181,13 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD return connectionId; } + if (IsModifyingTransaction) + { + ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot open new connections after the first modification " + "command within a transaction"))); + } + /* transcribe connection paremeters to string */ snprintf(connInfoString, STRING_BUFFER_SIZE, CONN_INFO_TEMPLATE, nodeName, nodePort, nodeDatabase, userName, CLIENT_CONNECT_TIMEOUT); diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 491c4e6eb..87bc6bfd6 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -11,45 +11,84 @@ * Copyright (c) 2012-2016, Citus Data, Inc. */ -#include "postgres.h" +#include "postgres.h" /* IWYU pragma: keep */ #include "c.h" -#include "fmgr.h" +#include "fmgr.h" /* IWYU pragma: keep */ #include "funcapi.h" #include "libpq-fe.h" #include "miscadmin.h" -#include "access/xact.h" +#include + +#include "access/htup.h" +#include "access/sdir.h" #include "access/transam.h" -#include "catalog/pg_type.h" +#include "access/tupdesc.h" +#include "access/xact.h" #include "distributed/citus_clauses.h" #include "distributed/citus_ruleutils.h" #include "distributed/connection_cache.h" -#include "distributed/listutils.h" +#include "distributed/master_metadata_utility.h" +#include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_planner.h" #include "distributed/multi_router_executor.h" +#include "distributed/relay_utility.h" #include "distributed/resource_lock.h" +#include "executor/execdesc.h" +#include "executor/executor.h" +#include "executor/instrument.h" +#include "executor/tuptable.h" +#include "lib/stringinfo.h" +#include "nodes/execnodes.h" +#include "nodes/nodes.h" +#include "nodes/params.h" +#include "nodes/parsenodes.h" #include "nodes/pg_list.h" -#include "optimizer/clauses.h" -#include "utils/builtins.h" +#include "nodes/plannodes.h" +#include "storage/ipc.h" +#include "storage/lock.h" +#include "tcop/dest.h" #include "utils/elog.h" #include "utils/errcodes.h" +#include "utils/hsearch.h" +#include "utils/int8.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/palloc.h" -#include "utils/int8.h" -#if (PG_VERSION_NUM >= 90500) -#include "utils/ruleutils.h" -#endif +#include "utils/tuplestore.h" /* controls use of locks to enforce safe commutativity */ bool AllModificationsCommutative = false; +/* + * The following static variables are necessary to track the progression of + * multi-statement transactions managed by the router executor. After the first + * modification within a transaction, the executor populates a hash with the + * transaction's initial participants (nodes hit by that initial modification). + * + * To keep track of the reverse mapping (from shards to nodes), we have a list + * of XactShardConnSets, which map a shard identifier to a set of connection + * hash entries. This list is walked by MarkRemainingInactivePlacements to + * ensure we mark placements as failed if they reject a COMMIT. + * + * Beyond that, there's a backend hook to register xact callbacks and a flag to + * track when a user tries to roll back to a savepoint (not allowed). + */ +static HTAB *xactParticipantHash = NULL; +static List *xactShardConnSetList = NIL; +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; +static bool subXactAbortAttempted = false; +/* functions needed during start phase */ +static void InitTransactionStateForTask(Task *task); static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery); static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode); +static HTAB * CreateXactParticipantHash(void); + +/* functions needed during run phase */ static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, bool isModificationQuery, @@ -57,7 +96,9 @@ static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc, static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, DestReceiver *destination, Tuplestorestate *tupleStore); -static void DeparseShardQuery(Query *query, Task *task, StringInfo queryString); +static PGconn * GetConnectionForPlacement(ShardPlacement *placement, + bool isModificationQuery); +static void PurgeConnectionForPlacement(ShardPlacement *placement); static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); @@ -66,6 +107,16 @@ static bool SendQueryInSingleRowMode(PGconn *connection, char *query, static bool StoreQueryResult(MaterialState *routerState, PGconn *connection, TupleDesc tupleDescriptor, int64 *rows); static bool ConsumeQueryResult(PGconn *connection, int64 *rows); +static void RecordShardIdParticipant(uint64 affectedShardId, + NodeConnectionEntry *participantEntry); + +/* functions needed by callbacks and hooks */ +static void RegisterRouterExecutorXactCallbacks(void); +static void RouterTransactionCallback(XactEvent event, void *arg); +static void RouterSubtransactionCallback(SubXactEvent event, SubTransactionId subId, + SubTransactionId parentSubid, void *arg); +static void ExecuteTransactionEnd(bool commit); +static void MarkRemainingInactivePlacements(void); /* @@ -82,12 +133,21 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task) /* ensure that the task is not NULL */ Assert(task != NULL); - /* disallow transactions and triggers during distributed modify commands */ + /* disallow triggers during distributed modify commands */ if (commandType != CMD_SELECT) { - bool topLevel = true; - PreventTransactionChain(topLevel, "distributed commands"); eflags |= EXEC_FLAG_SKIP_TRIGGERS; + + /* + * We could naturally handle function-based transactions (i.e. those + * using PL/pgSQL or similar) by checking the type of queryDesc->dest, + * but some customers already use functions that touch multiple shards + * from within a function, so we'll ignore functions for now. + */ + if (IsTransactionBlock() && xactParticipantHash == NULL) + { + InitTransactionStateForTask(task); + } } /* signal that it is a router execution */ @@ -122,6 +182,62 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task) } +/* + * InitTransactionStateForTask is called during executor start with the first + * modifying (INSERT/UPDATE/DELETE) task during a transaction. It creates the + * transaction participant hash, opens connections to this task's nodes, and + * populates the hash with those connections after sending BEGIN commands to + * each. If a node fails to respond, its connection is set to NULL to prevent + * further interaction with it during the transaction. + */ +static void +InitTransactionStateForTask(Task *task) +{ + ListCell *placementCell = NULL; + + xactParticipantHash = CreateXactParticipantHash(); + + foreach(placementCell, task->taskPlacementList) + { + ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); + NodeConnectionKey participantKey; + NodeConnectionEntry *participantEntry = NULL; + bool entryFound = false; + + PGconn *connection = NULL; + + MemSet(&participantKey, 0, sizeof(participantKey)); + strlcpy(participantKey.nodeName, placement->nodeName, + MAX_NODE_LENGTH + 1); + participantKey.nodePort = placement->nodePort; + + participantEntry = hash_search(xactParticipantHash, &participantKey, + HASH_ENTER, &entryFound); + Assert(!entryFound); + + connection = GetOrEstablishConnection(placement->nodeName, + placement->nodePort); + if (connection != NULL) + { + PGresult *result = PQexec(connection, "BEGIN"); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + { + WarnRemoteError(connection, result); + PurgeConnection(connection); + + connection = NULL; + } + + PQclear(result); + } + + participantEntry->connection = connection; + } + + IsModifyingTransaction = true; +} + + /* * CommutativityRuleToLockMode determines the commutativity rule for the given * command and returns the appropriate lock mode to enforce that rule. The @@ -188,6 +304,40 @@ AcquireExecutorShardLock(Task *task, LOCKMODE lockMode) } +/* + * CreateXactParticipantHash initializes the map used to store the connections + * needed to process distributed transactions. Unlike the connection cache, we + * permit NULL connections here to signify that a participant has seen an error + * and is no longer receiving commands during a transaction. This hash should + * be walked at transaction end to send final COMMIT or ABORT commands. + */ +static HTAB * +CreateXactParticipantHash(void) +{ + HTAB *xactParticipantHash = NULL; + HASHCTL info; + int hashFlags = 0; + + MemSet(&info, 0, sizeof(info)); + info.keysize = sizeof(NodeConnectionKey); + info.entrysize = sizeof(NodeConnectionEntry); + info.hcxt = TopTransactionContext; + hashFlags = (HASH_ELEM | HASH_CONTEXT); + +#if (PG_VERSION_NUM >= 90500) + hashFlags |= HASH_BLOBS; +#else + hashFlags |= HASH_FUNCTION; + info.hash = tag_hash; +#endif + + xactParticipantHash = hash_create("citus xact participant hash", 32, &info, + hashFlags); + + return xactParticipantHash; +} + + /* * RouterExecutorRun actually executes a single task on a worker. */ @@ -328,7 +478,6 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, List *taskPlacementList = task->taskPlacementList; ListCell *taskPlacementCell = NULL; List *failedPlacementList = NIL; - ListCell *failedPlacementCell = NULL; int64 affectedTupleCount = -1; bool gotResults = false; char *queryString = task->queryString; @@ -338,10 +487,11 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, PlannedStmt *planStatement = queryDesc->plannedstmt; MultiPlan *multiPlan = GetMultiPlan(planStatement); Query *query = multiPlan->workerJob->jobQuery; + Oid relid = ((RangeTblEntry *) linitial(query->rtable))->relid; StringInfo queryStringInfo = makeStringInfo(); ExecuteMasterEvaluableFunctions(query); - DeparseShardQuery(query, task, queryStringInfo); + deparse_shard_query(query, relid, task->anchorShardId, queryStringInfo); queryString = queryStringInfo->data; elog(DEBUG4, "query before master evaluation: %s", task->queryString); @@ -355,11 +505,10 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, foreach(taskPlacementCell, taskPlacementList) { ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); - char *nodeName = taskPlacement->nodeName; - int32 nodePort = taskPlacement->nodePort; bool queryOK = false; int64 currentAffectedTupleCount = 0; - PGconn *connection = GetOrEstablishConnection(nodeName, nodePort); + PGconn *connection = GetConnectionForPlacement(taskPlacement, + isModificationQuery); if (connection == NULL) { @@ -370,7 +519,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); if (!queryOK) { - PurgeConnection(connection); + PurgeConnectionForPlacement(taskPlacement); failedPlacementList = lappend(failedPlacementList, taskPlacement); continue; } @@ -404,7 +553,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, "to modify "INT64_FORMAT, currentAffectedTupleCount, affectedTupleCount), errdetail("modified placement on %s:%d", - nodeName, nodePort))); + taskPlacement->nodeName, taskPlacement->nodePort))); } #if (PG_VERSION_NUM < 90600) @@ -427,7 +576,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, } else { - PurgeConnection(connection); + PurgeConnectionForPlacement(taskPlacement); failedPlacementList = lappend(failedPlacementList, taskPlacement); @@ -437,6 +586,8 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, if (isModificationQuery) { + ListCell *failedPlacementCell = NULL; + /* if all placements failed, error out */ if (list_length(failedPlacementList) == list_length(task->taskPlacementList)) { @@ -463,16 +614,6 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, } -static void -DeparseShardQuery(Query *query, Task *task, StringInfo queryString) -{ - uint64 shardId = task->anchorShardId; - Oid relid = ((RangeTblEntry *) linitial(query->rtable))->relid; - - deparse_shard_query(query, relid, shardId, queryString); -} - - /* * ReturnRowsFromTuplestore moves rows from a given tuplestore into a * receiver. It performs the necessary limiting to support cursors. @@ -517,6 +658,99 @@ ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, } +/* + * GetConnectionForPlacement is the main entry point for acquiring a connection + * within the router executor. By using placements (rather than node names and + * ports) to identify connections, the router executor can keep track of shards + * used by multi-statement transactions and error out if a transaction tries + * to reach a new node altogether). In the single-statement commands context, + * GetConnectionForPlacement simply falls through to GetOrEstablishConnection. + */ +static PGconn * +GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery) +{ + NodeConnectionKey participantKey; + NodeConnectionEntry *participantEntry = NULL; + bool entryFound = false; + + /* if not in a transaction, fall through to connection cache */ + if (xactParticipantHash == NULL) + { + PGconn *connection = GetOrEstablishConnection(placement->nodeName, + placement->nodePort); + + return connection; + } + + Assert(IsTransactionBlock()); + + MemSet(&participantKey, 0, sizeof(participantKey)); + strlcpy(participantKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1); + participantKey.nodePort = placement->nodePort; + + participantEntry = hash_search(xactParticipantHash, &participantKey, HASH_FIND, + &entryFound); + + if (entryFound) + { + if (isModificationQuery) + { + RecordShardIdParticipant(placement->shardId, participantEntry); + } + + return participantEntry->connection; + } + else + { + ereport(ERROR, (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), + errmsg("no transaction participant matches %s:%d", + placement->nodeName, placement->nodePort), + errdetail("Transactions which modify distributed tables may only " + "target nodes affected by the modification command " + "which began the transaction."))); + } +} + + +/* + * PurgeConnectionForPlacement provides a way to purge an invalid connection + * from all relevant connection hashes using the placement involved in the + * query at the time of the error. If a transaction is ongoing, this function + * ensures the right node's connection is set to NULL in the participant map + * for the transaction in addition to purging the connection cache's entry. + */ +static void +PurgeConnectionForPlacement(ShardPlacement *placement) +{ + NodeConnectionKey nodeKey; + char *currentUser = CurrentUserName(); + + MemSet(&nodeKey, 0, sizeof(NodeConnectionKey)); + strlcpy(nodeKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1); + nodeKey.nodePort = placement->nodePort; + strlcpy(nodeKey.nodeUser, currentUser, NAMEDATALEN); + + PurgeConnectionByKey(&nodeKey); + + if (xactParticipantHash != NULL) + { + NodeConnectionEntry *participantEntry = NULL; + bool entryFound = false; + + Assert(IsTransactionBlock()); + + /* the participant hash doesn't use the user field */ + MemSet(&nodeKey.nodeUser, 0, sizeof(nodeKey.nodeUser)); + participantEntry = hash_search(xactParticipantHash, &nodeKey, HASH_FIND, + &entryFound); + + Assert(entryFound); + + participantEntry->connection = NULL; + } +} + + /* * SendQueryInSingleRowMode sends the given query on the connection in an * asynchronous way. The function also sets the single-row mode on the @@ -830,6 +1064,50 @@ ConsumeQueryResult(PGconn *connection, int64 *rows) } +/* + * RecordShardIdParticipant registers a connection as being involved with a + * particular shard during a multi-statement transaction. + */ +static void +RecordShardIdParticipant(uint64 affectedShardId, NodeConnectionEntry *participantEntry) +{ + XactShardConnSet *shardConnSetMatch = NULL; + ListCell *listCell = NULL; + MemoryContext oldContext = NULL; + List *connectionEntryList = NIL; + + /* check whether an entry already exists for this shard */ + foreach(listCell, xactShardConnSetList) + { + XactShardConnSet *shardConnSet = (XactShardConnSet *) lfirst(listCell); + + if (shardConnSet->shardId == affectedShardId) + { + shardConnSetMatch = shardConnSet; + } + } + + /* entries must last through the whole top-level transaction */ + oldContext = MemoryContextSwitchTo(TopTransactionContext); + + /* if no entry found, make one */ + if (shardConnSetMatch == NULL) + { + shardConnSetMatch = (XactShardConnSet *) palloc0(sizeof(XactShardConnSet)); + shardConnSetMatch->shardId = affectedShardId; + + xactShardConnSetList = lappend(xactShardConnSetList, shardConnSetMatch); + } + + /* add connection, avoiding duplicates */ + connectionEntryList = shardConnSetMatch->connectionEntryList; + shardConnSetMatch->connectionEntryList = list_append_unique_ptr(connectionEntryList, + participantEntry); + + MemoryContextSwitchTo(oldContext); +} + + /* * RouterExecutorFinish cleans up after a distributed execution. */ @@ -864,3 +1142,240 @@ RouterExecutorEnd(QueryDesc *queryDesc) queryDesc->estate = NULL; queryDesc->totaltime = NULL; } + + +/* + * InstallRouterExecutorShmemHook simply installs a hook (intended to be called + * once during backend startup), which will itself register all the transaction + * callbacks needed by this executor. + */ +void +InstallRouterExecutorShmemHook(void) +{ + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = RegisterRouterExecutorXactCallbacks; +} + + +/* + * RegisterRouterExecutorXactCallbacks registers (sub-)transaction callbacks + * needed by this executor before calling any previous shmem startup hooks. + */ +static void +RegisterRouterExecutorXactCallbacks(void) +{ + RegisterXactCallback(RouterTransactionCallback, NULL); + RegisterSubXactCallback(RouterSubtransactionCallback, NULL); + + if (prev_shmem_startup_hook != NULL) + { + prev_shmem_startup_hook(); + } +} + + +/* + * RouterTransactionCallback handles committing or aborting remote transactions + * after the local one has committed or aborted. It only sends COMMIT or ABORT + * commands to still-healthy remotes; the failed ones are marked as inactive if + * after a successful COMMIT (no need to mark on ABORTs). + */ +static void +RouterTransactionCallback(XactEvent event, void *arg) +{ + if (xactParticipantHash == NULL) + { + return; + } + + switch (event) + { +#if (PG_VERSION_NUM >= 90500) + case XACT_EVENT_PARALLEL_COMMIT: +#endif + case XACT_EVENT_COMMIT: + { + break; + } + +#if (PG_VERSION_NUM >= 90500) + case XACT_EVENT_PARALLEL_ABORT: +#endif + case XACT_EVENT_ABORT: + { + bool commit = false; + + ExecuteTransactionEnd(commit); + + break; + } + + /* no support for prepare with multi-statement transactions */ + case XACT_EVENT_PREPARE: + case XACT_EVENT_PRE_PREPARE: + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot prepare a transaction that modified " + "distributed tables"))); + + break; + } + +#if (PG_VERSION_NUM >= 90500) + case XACT_EVENT_PARALLEL_PRE_COMMIT: +#endif + case XACT_EVENT_PRE_COMMIT: + { + bool commit = true; + + if (subXactAbortAttempted) + { + subXactAbortAttempted = false; + + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot ROLLBACK TO SAVEPOINT in transactions " + "which modify distributed tables"))); + } + + ExecuteTransactionEnd(commit); + MarkRemainingInactivePlacements(); + + /* leave early to avoid resetting transaction state */ + return; + } + } + + /* reset transaction state */ + IsModifyingTransaction = false; + xactParticipantHash = NULL; + xactShardConnSetList = NIL; + subXactAbortAttempted = false; +} + + +/* + * RouterSubtransactionCallback silently keeps track of any attempt to ROLLBACK + * TO SAVEPOINT, which is not permitted by this executor. At transaction end, + * the executor checks whether such a rollback was attempted and, if so, errors + * out entirely (with an appropriate message). + * + * This implementation permits savepoints so long as no rollbacks occur. + */ +static void +RouterSubtransactionCallback(SubXactEvent event, SubTransactionId subId, + SubTransactionId parentSubid, void *arg) +{ + if ((xactParticipantHash != NULL) && (event == SUBXACT_EVENT_ABORT_SUB)) + { + subXactAbortAttempted = true; + } +} + + +/* + * ExecuteTransactionEnd ends any remote transactions still taking place on + * remote nodes. It uses xactParticipantHash to know which nodes need any + * final COMMIT or ABORT commands. Nodes which fail a final COMMIT will have + * their connection field set to NULL to permit placement invalidation. + */ +static void +ExecuteTransactionEnd(bool commit) +{ + const char *sqlCommand = commit ? "COMMIT TRANSACTION" : "ABORT TRANSACTION"; + HASH_SEQ_STATUS scan; + NodeConnectionEntry *participant; + bool completed = !commit; /* aborts are assumed completed */ + + hash_seq_init(&scan, xactParticipantHash); + while ((participant = (NodeConnectionEntry *) hash_seq_search(&scan))) + { + PGconn *connection = participant->connection; + PGresult *result = NULL; + + if (PQstatus(connection) != CONNECTION_OK) + { + continue; + } + + result = PQexec(connection, sqlCommand); + if (PQresultStatus(result) == PGRES_COMMAND_OK) + { + completed = true; + } + else + { + WarnRemoteError(connection, result); + PurgeConnection(participant->connection); + + participant->connection = NULL; + } + + PQclear(result); + } + + if (!completed) + { + ereport(ERROR, (errmsg("could not commit transaction on any active nodes"))); + } +} + + +/* + * MarkRemainingInactivePlacements takes care of marking placements of a shard + * inactive after some of the placements rejected the final COMMIT phase of a + * transaction. This step is skipped if all placements reject the COMMIT, since + * in that case no modifications to the placement have persisted. + * + * Failures are detected by checking the connection field of the entries in the + * connection set for each shard: it is always set to NULL after errors. + */ +static void +MarkRemainingInactivePlacements(void) +{ + ListCell *shardConnSetCell = NULL; + + foreach(shardConnSetCell, xactShardConnSetList) + { + XactShardConnSet *shardConnSet = (XactShardConnSet *) lfirst(shardConnSetCell); + List *participantList = shardConnSet->connectionEntryList; + ListCell *participantCell = NULL; + int successes = list_length(participantList); /* assume full success */ + + /* determine how many actual successes there were: subtract failures */ + foreach(participantCell, participantList) + { + NodeConnectionEntry *participant = NULL; + participant = (NodeConnectionEntry *) lfirst(participantCell); + + /* other codes sets connection to NULL after errors */ + if (participant->connection == NULL) + { + successes--; + } + } + + /* if no nodes succeeded for this shard, don't do anything */ + if (successes == 0) + { + continue; + } + + /* otherwise, ensure failed placements are marked inactive */ + foreach(participantCell, participantList) + { + NodeConnectionEntry *participant = NULL; + participant = (NodeConnectionEntry *) lfirst(participantCell); + + if (participant->connection == NULL) + { + uint64 shardId = shardConnSet->shardId; + NodeConnectionKey *nodeKey = &participant->cacheKey; + uint64 shardLength = 0; + + DeleteShardPlacementRow(shardId, nodeKey->nodeName, nodeKey->nodePort); + InsertShardPlacementRow(shardId, FILE_INACTIVE, shardLength, + nodeKey->nodeName, nodeKey->nodePort); + } + } + } +} diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 921500609..5eb231099 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -147,6 +147,9 @@ _PG_init(void) /* initialize worker node manager */ WorkerNodeRegister(); + + /* initialize router executor callbacks */ + InstallRouterExecutorShmemHook(); } diff --git a/src/backend/distributed/utils/connection_cache.c b/src/backend/distributed/utils/connection_cache.c index 83eb73088..c6f062028 100644 --- a/src/backend/distributed/utils/connection_cache.c +++ b/src/backend/distributed/utils/connection_cache.c @@ -32,6 +32,9 @@ #include "utils/palloc.h" +/* state needed to prevent new connections during modifying transactions */ +bool IsModifyingTransaction = false; + /* * NodeConnectionHash is the connection hash itself. It begins uninitialized. * The first call to GetOrEstablishConnection triggers hash creation. @@ -81,9 +84,9 @@ GetOrEstablishConnection(char *nodeName, int32 nodePort) } memset(&nodeConnectionKey, 0, sizeof(nodeConnectionKey)); - strncpy(nodeConnectionKey.nodeName, nodeName, MAX_NODE_LENGTH); + strlcpy(nodeConnectionKey.nodeName, nodeName, MAX_NODE_LENGTH + 1); nodeConnectionKey.nodePort = nodePort; - strncpy(nodeConnectionKey.nodeUser, userName, NAMEDATALEN); + strlcpy(nodeConnectionKey.nodeUser, userName, NAMEDATALEN); nodeConnectionEntry = hash_search(NodeConnectionHash, &nodeConnectionKey, HASH_FIND, &entryFound); @@ -124,11 +127,10 @@ void PurgeConnection(PGconn *connection) { NodeConnectionKey nodeConnectionKey; - NodeConnectionEntry *nodeConnectionEntry = NULL; - bool entryFound = false; char *nodeNameString = NULL; char *nodePortString = NULL; char *nodeUserString = NULL; + PGconn *purgedConnection = NULL; nodeNameString = ConnectionGetOptionValue(connection, "host"); if (nodeNameString == NULL) @@ -152,42 +154,54 @@ PurgeConnection(PGconn *connection) } memset(&nodeConnectionKey, 0, sizeof(nodeConnectionKey)); - strncpy(nodeConnectionKey.nodeName, nodeNameString, MAX_NODE_LENGTH); + strlcpy(nodeConnectionKey.nodeName, nodeNameString, MAX_NODE_LENGTH + 1); nodeConnectionKey.nodePort = pg_atoi(nodePortString, sizeof(int32), 0); - strncpy(nodeConnectionKey.nodeUser, nodeUserString, NAMEDATALEN); + strlcpy(nodeConnectionKey.nodeUser, nodeUserString, NAMEDATALEN); pfree(nodeNameString); pfree(nodePortString); pfree(nodeUserString); - nodeConnectionEntry = hash_search(NodeConnectionHash, &nodeConnectionKey, - HASH_REMOVE, &entryFound); + purgedConnection = PurgeConnectionByKey(&nodeConnectionKey); + + /* + * It's possible the provided connection matches the host and port for + * an entry in the hash without being precisely the same connection. In + * that case, we will want to close the provided connection in addition + * to the one from the hash (which was closed by PurgeConnectionByKey). + */ + if (purgedConnection != connection) + { + ereport(WARNING, (errmsg("hash entry for \"%s:%d\" contained different " + "connection than that provided by caller", + nodeConnectionKey.nodeName, + nodeConnectionKey.nodePort))); + PQfinish(connection); + } +} + + +PGconn * +PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey) +{ + bool entryFound = false; + NodeConnectionEntry *nodeConnectionEntry = NULL; + + nodeConnectionEntry = hash_search(NodeConnectionHash, nodeConnectionKey, HASH_REMOVE, + &entryFound); if (entryFound) { - /* - * It's possible the provided connection matches the host and port for - * an entry in the hash without being precisely the same connection. In - * that case, we will want to close the hash's connection (because the - * entry has already been removed) in addition to the provided one. - */ - if (nodeConnectionEntry->connection != connection) - { - ereport(WARNING, (errmsg("hash entry for \"%s:%d\" contained different " - "connection than that provided by caller", - nodeConnectionKey.nodeName, - nodeConnectionKey.nodePort))); - PQfinish(nodeConnectionEntry->connection); - } + PQfinish(nodeConnectionEntry->connection); } else { ereport(WARNING, (errcode(ERRCODE_NO_DATA), errmsg("could not find hash entry for connection to \"%s:%d\"", - nodeConnectionKey.nodeName, - nodeConnectionKey.nodePort))); + nodeConnectionKey->nodeName, + nodeConnectionKey->nodePort))); } - PQfinish(connection); + return nodeConnectionEntry->connection; } @@ -370,6 +384,13 @@ ConnectToNode(char *nodeName, int32 nodePort, char *nodeUser) sprintf(nodePortString, "%d", nodePort); + if (IsModifyingTransaction) + { + ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot open new connections after the first modification " + "command within a transaction"))); + } + Assert(sizeof(keywordArray) == sizeof(valueArray)); for (attemptIndex = 0; attemptIndex < MAX_CONNECT_ATTEMPTS; attemptIndex++) diff --git a/src/backend/distributed/worker/task_tracker.c b/src/backend/distributed/worker/task_tracker.c index 3db6194af..f0ee43dc4 100644 --- a/src/backend/distributed/worker/task_tracker.c +++ b/src/backend/distributed/worker/task_tracker.c @@ -399,8 +399,8 @@ TrackerCleanupJobSchemas(void) cleanupTask->assignedAt = HIGH_PRIORITY_TASK_TIME; cleanupTask->taskStatus = TASK_ASSIGNED; - strncpy(cleanupTask->taskCallString, JOB_SCHEMA_CLEANUP, TASK_CALL_STRING_SIZE); - strncpy(cleanupTask->databaseName, databaseName, NAMEDATALEN); + strlcpy(cleanupTask->taskCallString, JOB_SCHEMA_CLEANUP, TASK_CALL_STRING_SIZE); + strlcpy(cleanupTask->databaseName, databaseName, NAMEDATALEN); /* zero out all other fields */ cleanupTask->connectionId = INVALID_CONNECTION_ID; diff --git a/src/backend/distributed/worker/task_tracker_protocol.c b/src/backend/distributed/worker/task_tracker_protocol.c index d167ccfaa..67189bea8 100644 --- a/src/backend/distributed/worker/task_tracker_protocol.c +++ b/src/backend/distributed/worker/task_tracker_protocol.c @@ -313,13 +313,13 @@ CreateTask(uint64 jobId, uint32 taskId, char *taskCallString) /* enter the worker task into shared hash and initialize the task */ workerTask = WorkerTasksHashEnter(jobId, taskId); workerTask->assignedAt = assignmentTime; - strncpy(workerTask->taskCallString, taskCallString, TASK_CALL_STRING_SIZE); + strlcpy(workerTask->taskCallString, taskCallString, TASK_CALL_STRING_SIZE); workerTask->taskStatus = TASK_ASSIGNED; workerTask->connectionId = INVALID_CONNECTION_ID; workerTask->failureCount = 0; - strncpy(workerTask->databaseName, databaseName, NAMEDATALEN); - strncpy(workerTask->userName, userName, NAMEDATALEN); + strlcpy(workerTask->databaseName, databaseName, NAMEDATALEN); + strlcpy(workerTask->userName, userName, NAMEDATALEN); } @@ -350,13 +350,13 @@ UpdateTask(WorkerTask *workerTask, char *taskCallString) } else if (taskStatus == TASK_PERMANENTLY_FAILED) { - strncpy(workerTask->taskCallString, taskCallString, TASK_CALL_STRING_SIZE); + strlcpy(workerTask->taskCallString, taskCallString, TASK_CALL_STRING_SIZE); workerTask->failureCount = 0; workerTask->taskStatus = TASK_ASSIGNED; } else { - strncpy(workerTask->taskCallString, taskCallString, TASK_CALL_STRING_SIZE); + strlcpy(workerTask->taskCallString, taskCallString, TASK_CALL_STRING_SIZE); workerTask->failureCount = 0; } } diff --git a/src/include/distributed/connection_cache.h b/src/include/distributed/connection_cache.h index 326e229c2..f335e0735 100644 --- a/src/include/distributed/connection_cache.h +++ b/src/include/distributed/connection_cache.h @@ -53,9 +53,14 @@ typedef struct NodeConnectionEntry } NodeConnectionEntry; +/* state needed to prevent new connections during modifying transactions */ +extern bool IsModifyingTransaction; + + /* function declarations for obtaining and using a connection */ extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort); extern void PurgeConnection(PGconn *connection); +extern PGconn * PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey); extern bool SqlStateMatchesCategory(char *sqlStateString, int category); extern void WarnRemoteError(PGconn *connection, PGresult *result); extern void ReraiseRemoteError(PGconn *connection, PGresult *result); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 032b8f49f..a23083590 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -14,6 +14,9 @@ #ifndef MULTI_PHYSICAL_PLANNER_H #define MULTI_PHYSICAL_PLANNER_H +#include "postgres.h" +#include "c.h" + #include "datatype/timestamp.h" #include "distributed/citus_nodes.h" #include "distributed/master_metadata_utility.h" diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index e01540237..51bb597f1 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -9,9 +9,27 @@ #ifndef MULTI_ROUTER_EXECUTOR_H_ #define MULTI_ROUTER_EXECUTOR_H_ +#include "c.h" + +#include "access/sdir.h" #include "distributed/multi_physical_planner.h" #include "executor/execdesc.h" +#include "nodes/pg_list.h" + +/* + * XactShardConnSet keeps track of the mapping from shard to the set of nodes + * involved in multi-statement transaction-wrapped modifications of that shard. + * This information is used to mark placements inactive at transaction close. + */ +typedef struct XactShardConnSet +{ + uint64 shardId; /* identifier of the shard that was modified */ + List *connectionEntryList; /* NodeConnectionEntry pointers to participating nodes */ +} XactShardConnSet; + + +/* Config variables managed via guc.c */ extern bool AllModificationsCommutative; @@ -19,5 +37,6 @@ extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task); extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count); extern void RouterExecutorFinish(QueryDesc *queryDesc); extern void RouterExecutorEnd(QueryDesc *queryDesc); +extern void InstallRouterExecutorShmemHook(void); #endif /* MULTI_ROUTER_EXECUTOR_H_ */ diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out new file mode 100644 index 000000000..e2b736f2f --- /dev/null +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -0,0 +1,507 @@ +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1200000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1200000; +-- =================================================================== +-- test end-to-end modification functionality +-- =================================================================== +CREATE TABLE researchers ( + id bigint NOT NULL, + lab_id int NOT NULL, + name text NOT NULL +); +CREATE TABLE labs ( + id bigint NOT NULL, + name text NOT NULL +); +SELECT master_create_distributed_table('researchers', 'lab_id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('researchers', 2, 2); + master_create_worker_shards +----------------------------- + +(1 row) + +SELECT master_create_distributed_table('labs', 'id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('labs', 1, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +-- add some data +INSERT INTO researchers VALUES (1, 1, 'Donald Knuth'); +INSERT INTO researchers VALUES (2, 1, 'Niklaus Wirth'); +INSERT INTO researchers VALUES (3, 2, 'Tony Hoare'); +INSERT INTO researchers VALUES (4, 2, 'Kenneth Iverson'); +-- replace a researcher, reusing their id +BEGIN; +DELETE FROM researchers WHERE lab_id = 1 AND id = 2; +INSERT INTO researchers VALUES (2, 1, 'John Backus'); +COMMIT; +SELECT name FROM researchers WHERE lab_id = 1 AND id = 2; + name +------------- + John Backus +(1 row) + +-- abort a modification +BEGIN; +DELETE FROM researchers WHERE lab_id = 1 AND id = 1; +ABORT; +SELECT name FROM researchers WHERE lab_id = 1 AND id = 1; + name +-------------- + Donald Knuth +(1 row) + +-- creating savepoints should work... +BEGIN; +INSERT INTO researchers VALUES (5, 3, 'Dennis Ritchie'); +SAVEPOINT hire_thompson; +INSERT INTO researchers VALUES (6, 3, 'Ken Thompson'); +COMMIT; +SELECT name FROM researchers WHERE lab_id = 3 AND id = 6; + name +-------------- + Ken Thompson +(1 row) + +-- even if created by PL/pgSQL... +BEGIN; +DO $$ +BEGIN + INSERT INTO researchers VALUES (10, 10, 'Edsger Dijkstra'); +EXCEPTION + WHEN not_null_violation THEN + RAISE NOTICE 'caught not_null_violation'; +END $$; +COMMIT; +-- but rollback should not +BEGIN; +INSERT INTO researchers VALUES (7, 4, 'Jim Gray'); +SAVEPOINT hire_engelbart; +INSERT INTO researchers VALUES (8, 4, 'Douglas Engelbart'); +ROLLBACK TO hire_engelbart; +COMMIT; +ERROR: cannot ROLLBACK TO SAVEPOINT in transactions which modify distributed tables +SELECT name FROM researchers WHERE lab_id = 4; + name +------ +(0 rows) + +BEGIN; +DO $$ +BEGIN + INSERT INTO researchers VALUES (NULL, 10, 'Edsger Dijkstra'); +EXCEPTION + WHEN not_null_violation THEN + RAISE NOTICE 'caught not_null_violation'; +END $$; +NOTICE: caught not_null_violation +COMMIT; +ERROR: cannot ROLLBACK TO SAVEPOINT in transactions which modify distributed tables +-- should be valid to edit labs after researchers... +BEGIN; +INSERT INTO researchers VALUES (8, 5, 'Douglas Engelbart'); +INSERT INTO labs VALUES (5, 'Los Alamos'); +COMMIT; +SELECT * FROM researchers, labs WHERE labs.id = researchers.lab_id; + id | lab_id | name | id | name +----+--------+-------------------+----+------------ + 8 | 5 | Douglas Engelbart | 5 | Los Alamos +(1 row) + +-- but not the other way around (would require expanding xact participants)... +BEGIN; +INSERT INTO labs VALUES (6, 'Bell Labs'); +INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport'); +ERROR: no transaction participant matches localhost:57638 +DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction. +COMMIT; +-- this logic even applies to router SELECTs occurring after a modification: +-- selecting from the modified node is fine... +BEGIN; +INSERT INTO labs VALUES (6, 'Bell Labs'); +SELECT count(*) FROM researchers WHERE lab_id = 6; + count +------- + 0 +(1 row) + +ABORT; +-- but if a SELECT needs to go to new node, that's a problem... +BEGIN; +UPDATE pg_dist_shard_placement AS sp SET shardstate = 3 +FROM pg_dist_shard AS s +WHERE sp.shardid = s.shardid +AND sp.nodename = 'localhost' +AND sp.nodeport = :worker_1_port +AND s.logicalrelid = 'researchers'::regclass; +INSERT INTO labs VALUES (6, 'Bell Labs'); +SELECT count(*) FROM researchers WHERE lab_id = 6; +ERROR: no transaction participant matches localhost:57638 +DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction. +ABORT; +-- applies to DDL or COPY, too +BEGIN; +INSERT INTO labs VALUES (6, 'Bell Labs'); +ALTER TABLE labs ADD COLUMN text motto; +ERROR: distributed DDL commands cannot run inside a transaction block +COMMIT; +BEGIN; +INSERT INTO labs VALUES (6, 'Bell Labs'); +\copy labs from stdin delimiter ',' +ERROR: cannot open new connections after the first modification command within a transaction +CONTEXT: COPY labs, line 1: "10,Weyland-Yutani" +COMMIT; +-- though the copy will work if before any modifications +BEGIN; +\copy labs from stdin delimiter ',' +SELECT name FROM labs WHERE id = 10; + name +---------------- + Weyland-Yutani +(1 row) + +INSERT INTO labs VALUES (6, 'Bell Labs'); +COMMIT; +-- now, for some special failures... +CREATE TABLE objects ( + id bigint PRIMARY KEY, + name text NOT NULL +); +SELECT master_create_distributed_table('objects', 'id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('objects', 1, 2); + master_create_worker_shards +----------------------------- + +(1 row) + +-- test primary key violations +BEGIN; +INSERT INTO objects VALUES (1, 'apple'); +INSERT INTO objects VALUES (1, 'orange'); +ERROR: duplicate key value violates unique constraint "objects_pkey_1200003" +DETAIL: Key (id)=(1) already exists. +CONTEXT: while executing command on localhost:57637 +COMMIT; +-- data shouldn't have persisted... +SELECT * FROM objects WHERE id = 1; + id | name +----+------ +(0 rows) + +-- and placements should still be healthy... +SELECT count(*) +FROM pg_dist_shard_placement AS sp, + pg_dist_shard AS s +WHERE sp.shardid = s.shardid +AND sp.shardstate = 1 +AND s.logicalrelid = 'objects'::regclass; + count +------- + 2 +(1 row) + +-- create trigger on one worker to reject certain values +\c - - - :worker_2_port +CREATE FUNCTION reject_bad() RETURNS trigger AS $rb$ + BEGIN + IF (NEW.name = 'BAD') THEN + RAISE 'illegal value'; + END IF; + + RETURN NEW; + END; +$rb$ LANGUAGE plpgsql; +CREATE CONSTRAINT TRIGGER reject_bad +AFTER INSERT ON objects_1200003 +DEFERRABLE INITIALLY IMMEDIATE +FOR EACH ROW EXECUTE PROCEDURE reject_bad(); +\c - - - :master_port +-- test partial failure; worker_1 succeeds, 2 fails +BEGIN; +INSERT INTO objects VALUES (1, 'apple'); +INSERT INTO objects VALUES (2, 'BAD'); +WARNING: illegal value +CONTEXT: while executing command on localhost:57638 +INSERT INTO labs VALUES (7, 'E Corp'); +COMMIT; +-- data should be persisted +SELECT * FROM objects WHERE id = 2; + id | name +----+------ + 2 | BAD +(1 row) + +SELECT * FROM labs WHERE id = 7; + id | name +----+-------- + 7 | E Corp +(1 row) + +-- but one placement should be bad +SELECT count(*) +FROM pg_dist_shard_placement AS sp, + pg_dist_shard AS s +WHERE sp.shardid = s.shardid +AND sp.nodename = 'localhost' +AND sp.nodeport = :worker_2_port +AND sp.shardstate = 3 +AND s.logicalrelid = 'objects'::regclass; + count +------- + 1 +(1 row) + +DELETE FROM objects; +-- mark shards as healthy again; delete all data +UPDATE pg_dist_shard_placement AS sp SET shardstate = 1 +FROM pg_dist_shard AS s +WHERE sp.shardid = s.shardid +AND s.logicalrelid = 'objects'::regclass; +-- what if there are errors on different shards at different times? +\c - - - :worker_1_port +CREATE FUNCTION reject_bad() RETURNS trigger AS $rb$ + BEGIN + IF (NEW.name = 'BAD') THEN + RAISE 'illegal value'; + END IF; + + RETURN NEW; + END; +$rb$ LANGUAGE plpgsql; +CREATE CONSTRAINT TRIGGER reject_bad +AFTER INSERT ON labs_1200002 +DEFERRABLE INITIALLY IMMEDIATE +FOR EACH ROW EXECUTE PROCEDURE reject_bad(); +\c - - - :master_port +BEGIN; +INSERT INTO objects VALUES (1, 'apple'); +INSERT INTO objects VALUES (2, 'BAD'); +WARNING: illegal value +CONTEXT: while executing command on localhost:57638 +INSERT INTO labs VALUES (8, 'Aperture Science'); +INSERT INTO labs VALUES (9, 'BAD'); +WARNING: illegal value +CONTEXT: while executing command on localhost:57637 +ERROR: could not modify any active placements +COMMIT; +-- data should NOT be persisted +SELECT * FROM objects WHERE id = 1; + id | name +----+------ +(0 rows) + +SELECT * FROM labs WHERE id = 8; + id | name +----+------ +(0 rows) + +-- all placements should remain healthy +SELECT count(*) +FROM pg_dist_shard_placement AS sp, + pg_dist_shard AS s +WHERE sp.shardid = s.shardid +AND sp.shardstate = 1 +AND (s.logicalrelid = 'objects'::regclass OR + s.logicalrelid = 'labs'::regclass); + count +------- + 3 +(1 row) + +-- what if the failures happen at COMMIT time? +\c - - - :worker_2_port +DROP TRIGGER reject_bad ON objects_1200003; +CREATE CONSTRAINT TRIGGER reject_bad +AFTER INSERT ON objects_1200003 +DEFERRABLE INITIALLY DEFERRED +FOR EACH ROW EXECUTE PROCEDURE reject_bad(); +\c - - - :master_port +-- should be the same story as before, just at COMMIT time +BEGIN; +INSERT INTO objects VALUES (1, 'apple'); +INSERT INTO objects VALUES (2, 'BAD'); +INSERT INTO labs VALUES (9, 'Umbrella Corporation'); +COMMIT; +WARNING: illegal value +CONTEXT: while executing command on localhost:57638 +-- data should be persisted +SELECT * FROM objects WHERE id = 2; + id | name +----+------ + 2 | BAD +(1 row) + +SELECT * FROM labs WHERE id = 7; + id | name +----+-------- + 7 | E Corp +(1 row) + +-- but one placement should be bad +SELECT count(*) +FROM pg_dist_shard_placement AS sp, + pg_dist_shard AS s +WHERE sp.shardid = s.shardid +AND sp.nodename = 'localhost' +AND sp.nodeport = :worker_2_port +AND sp.shardstate = 3 +AND s.logicalrelid = 'objects'::regclass; + count +------- + 1 +(1 row) + +DELETE FROM objects; +-- mark shards as healthy again; delete all data +UPDATE pg_dist_shard_placement AS sp SET shardstate = 1 +FROM pg_dist_shard AS s +WHERE sp.shardid = s.shardid +AND s.logicalrelid = 'objects'::regclass; +-- what if all nodes have failures at COMMIT time? +\c - - - :worker_1_port +DROP TRIGGER reject_bad ON labs_1200002; +CREATE CONSTRAINT TRIGGER reject_bad +AFTER INSERT ON labs_1200002 +DEFERRABLE INITIALLY DEFERRED +FOR EACH ROW EXECUTE PROCEDURE reject_bad(); +\c - - - :master_port +BEGIN; +INSERT INTO objects VALUES (1, 'apple'); +INSERT INTO objects VALUES (2, 'BAD'); +INSERT INTO labs VALUES (8, 'Aperture Science'); +INSERT INTO labs VALUES (9, 'BAD'); +COMMIT; +WARNING: illegal value +CONTEXT: while executing command on localhost:57638 +WARNING: illegal value +CONTEXT: while executing command on localhost:57637 +ERROR: could not commit transaction on any active nodes +-- data should NOT be persisted +SELECT * FROM objects WHERE id = 1; + id | name +----+------ +(0 rows) + +SELECT * FROM labs WHERE id = 8; + id | name +----+------ +(0 rows) + +-- all placements should remain healthy +SELECT count(*) +FROM pg_dist_shard_placement AS sp, + pg_dist_shard AS s +WHERE sp.shardid = s.shardid +AND sp.shardstate = 1 +AND (s.logicalrelid = 'objects'::regclass OR + s.logicalrelid = 'labs'::regclass); + count +------- + 3 +(1 row) + +-- what if one shard (objects) succeeds but another (labs) completely fails? +\c - - - :worker_2_port +DROP TRIGGER reject_bad ON objects_1200003; +\c - - - :master_port +BEGIN; +INSERT INTO objects VALUES (1, 'apple'); +INSERT INTO labs VALUES (8, 'Aperture Science'); +INSERT INTO labs VALUES (9, 'BAD'); +COMMIT; +WARNING: illegal value +CONTEXT: while executing command on localhost:57637 +-- data to objects should be persisted, but labs should not... +SELECT * FROM objects WHERE id = 1; + id | name +----+------- + 1 | apple +(1 row) + +SELECT * FROM labs WHERE id = 8; + id | name +----+------ +(0 rows) + +-- labs should be healthy, but one object placement shouldn't be +SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*) +FROM pg_dist_shard_placement AS sp, + pg_dist_shard AS s +WHERE sp.shardid = s.shardid +AND (s.logicalrelid = 'objects'::regclass OR + s.logicalrelid = 'labs'::regclass) +GROUP BY s.logicalrelid, sp.shardstate +ORDER BY s.logicalrelid, sp.shardstate; + logicalrelid | shardstate | count +--------------+------------+------- + labs | 1 | 1 + objects | 1 | 1 + objects | 3 | 1 +(3 rows) + +-- some append-partitioned tests for good measure +CREATE TABLE append_researchers ( LIKE researchers ); +SELECT master_create_distributed_table('append_researchers', 'id', 'append'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SET citus.shard_replication_factor TO 1; +SELECT master_create_empty_shard('append_researchers') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = 0, shardmaxvalue = 500000 +WHERE shardid = :new_shard_id; +SELECT master_create_empty_shard('append_researchers') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = 500000, shardmaxvalue = 1000000 +WHERE shardid = :new_shard_id; +SET citus.shard_replication_factor TO DEFAULT; +-- try single-shard INSERT +BEGIN; +INSERT INTO append_researchers VALUES (0, 0, 'John Backus'); +COMMIT; +SELECT * FROM append_researchers WHERE id = 0; + id | lab_id | name +----+--------+------------- + 0 | 0 | John Backus +(1 row) + +-- try rollback +BEGIN; +DELETE FROM append_researchers WHERE id = 0; +ROLLBACK; +SELECT * FROM append_researchers WHERE id = 0; + id | lab_id | name +----+--------+------------- + 0 | 0 | John Backus +(1 row) + +-- try hitting shard on other node +BEGIN; +INSERT INTO append_researchers VALUES (1, 1, 'John McCarthy'); +INSERT INTO append_researchers VALUES (500000, 500000, 'Tony Hoare'); +ERROR: distributed modifications must target exactly one shard +ROLLBACK; +SELECT * FROM append_researchers; + id | lab_id | name +----+--------+------------- + 0 | 0 | John Backus +(1 row) + diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 5f725e68b..3f159ee7c 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -129,6 +129,7 @@ test: multi_utilities test: multi_create_insert_proxy test: multi_data_types test: multi_repartitioned_subquery_udf +test: multi_modifying_xacts # --------- # multi_copy creates hash and range-partitioned tables and performs COPY diff --git a/src/test/regress/sql/multi_modifying_xacts.sql b/src/test/regress/sql/multi_modifying_xacts.sql new file mode 100644 index 000000000..c875e4c15 --- /dev/null +++ b/src/test/regress/sql/multi_modifying_xacts.sql @@ -0,0 +1,400 @@ + +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1200000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1200000; + + +-- =================================================================== +-- test end-to-end modification functionality +-- =================================================================== + +CREATE TABLE researchers ( + id bigint NOT NULL, + lab_id int NOT NULL, + name text NOT NULL +); + +CREATE TABLE labs ( + id bigint NOT NULL, + name text NOT NULL +); + +SELECT master_create_distributed_table('researchers', 'lab_id', 'hash'); +SELECT master_create_worker_shards('researchers', 2, 2); + +SELECT master_create_distributed_table('labs', 'id', 'hash'); +SELECT master_create_worker_shards('labs', 1, 1); + +-- add some data +INSERT INTO researchers VALUES (1, 1, 'Donald Knuth'); +INSERT INTO researchers VALUES (2, 1, 'Niklaus Wirth'); +INSERT INTO researchers VALUES (3, 2, 'Tony Hoare'); +INSERT INTO researchers VALUES (4, 2, 'Kenneth Iverson'); + +-- replace a researcher, reusing their id +BEGIN; +DELETE FROM researchers WHERE lab_id = 1 AND id = 2; +INSERT INTO researchers VALUES (2, 1, 'John Backus'); +COMMIT; + +SELECT name FROM researchers WHERE lab_id = 1 AND id = 2; + +-- abort a modification +BEGIN; +DELETE FROM researchers WHERE lab_id = 1 AND id = 1; +ABORT; + +SELECT name FROM researchers WHERE lab_id = 1 AND id = 1; + +-- creating savepoints should work... +BEGIN; +INSERT INTO researchers VALUES (5, 3, 'Dennis Ritchie'); +SAVEPOINT hire_thompson; +INSERT INTO researchers VALUES (6, 3, 'Ken Thompson'); +COMMIT; + +SELECT name FROM researchers WHERE lab_id = 3 AND id = 6; + +-- even if created by PL/pgSQL... +BEGIN; +DO $$ +BEGIN + INSERT INTO researchers VALUES (10, 10, 'Edsger Dijkstra'); +EXCEPTION + WHEN not_null_violation THEN + RAISE NOTICE 'caught not_null_violation'; +END $$; +COMMIT; + +-- but rollback should not +BEGIN; +INSERT INTO researchers VALUES (7, 4, 'Jim Gray'); +SAVEPOINT hire_engelbart; +INSERT INTO researchers VALUES (8, 4, 'Douglas Engelbart'); +ROLLBACK TO hire_engelbart; +COMMIT; + +SELECT name FROM researchers WHERE lab_id = 4; + +BEGIN; +DO $$ +BEGIN + INSERT INTO researchers VALUES (NULL, 10, 'Edsger Dijkstra'); +EXCEPTION + WHEN not_null_violation THEN + RAISE NOTICE 'caught not_null_violation'; +END $$; +COMMIT; + + +-- should be valid to edit labs after researchers... +BEGIN; +INSERT INTO researchers VALUES (8, 5, 'Douglas Engelbart'); +INSERT INTO labs VALUES (5, 'Los Alamos'); +COMMIT; + +SELECT * FROM researchers, labs WHERE labs.id = researchers.lab_id; + +-- but not the other way around (would require expanding xact participants)... +BEGIN; +INSERT INTO labs VALUES (6, 'Bell Labs'); +INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport'); +COMMIT; + +-- this logic even applies to router SELECTs occurring after a modification: +-- selecting from the modified node is fine... +BEGIN; +INSERT INTO labs VALUES (6, 'Bell Labs'); +SELECT count(*) FROM researchers WHERE lab_id = 6; +ABORT; + +-- but if a SELECT needs to go to new node, that's a problem... + +BEGIN; + +UPDATE pg_dist_shard_placement AS sp SET shardstate = 3 +FROM pg_dist_shard AS s +WHERE sp.shardid = s.shardid +AND sp.nodename = 'localhost' +AND sp.nodeport = :worker_1_port +AND s.logicalrelid = 'researchers'::regclass; + +INSERT INTO labs VALUES (6, 'Bell Labs'); +SELECT count(*) FROM researchers WHERE lab_id = 6; +ABORT; + +-- applies to DDL or COPY, too +BEGIN; +INSERT INTO labs VALUES (6, 'Bell Labs'); +ALTER TABLE labs ADD COLUMN text motto; +COMMIT; + +BEGIN; +INSERT INTO labs VALUES (6, 'Bell Labs'); +\copy labs from stdin delimiter ',' +10,Weyland-Yutani +\. +COMMIT; + +-- though the copy will work if before any modifications +BEGIN; +\copy labs from stdin delimiter ',' +10,Weyland-Yutani +\. +SELECT name FROM labs WHERE id = 10; +INSERT INTO labs VALUES (6, 'Bell Labs'); +COMMIT; + +-- now, for some special failures... +CREATE TABLE objects ( + id bigint PRIMARY KEY, + name text NOT NULL +); + +SELECT master_create_distributed_table('objects', 'id', 'hash'); +SELECT master_create_worker_shards('objects', 1, 2); + +-- test primary key violations +BEGIN; +INSERT INTO objects VALUES (1, 'apple'); +INSERT INTO objects VALUES (1, 'orange'); +COMMIT; + +-- data shouldn't have persisted... +SELECT * FROM objects WHERE id = 1; + +-- and placements should still be healthy... +SELECT count(*) +FROM pg_dist_shard_placement AS sp, + pg_dist_shard AS s +WHERE sp.shardid = s.shardid +AND sp.shardstate = 1 +AND s.logicalrelid = 'objects'::regclass; + +-- create trigger on one worker to reject certain values +\c - - - :worker_2_port + +CREATE FUNCTION reject_bad() RETURNS trigger AS $rb$ + BEGIN + IF (NEW.name = 'BAD') THEN + RAISE 'illegal value'; + END IF; + + RETURN NEW; + END; +$rb$ LANGUAGE plpgsql; + +CREATE CONSTRAINT TRIGGER reject_bad +AFTER INSERT ON objects_1200003 +DEFERRABLE INITIALLY IMMEDIATE +FOR EACH ROW EXECUTE PROCEDURE reject_bad(); + +\c - - - :master_port + +-- test partial failure; worker_1 succeeds, 2 fails +BEGIN; +INSERT INTO objects VALUES (1, 'apple'); +INSERT INTO objects VALUES (2, 'BAD'); +INSERT INTO labs VALUES (7, 'E Corp'); +COMMIT; + +-- data should be persisted +SELECT * FROM objects WHERE id = 2; +SELECT * FROM labs WHERE id = 7; + +-- but one placement should be bad +SELECT count(*) +FROM pg_dist_shard_placement AS sp, + pg_dist_shard AS s +WHERE sp.shardid = s.shardid +AND sp.nodename = 'localhost' +AND sp.nodeport = :worker_2_port +AND sp.shardstate = 3 +AND s.logicalrelid = 'objects'::regclass; + +DELETE FROM objects; + +-- mark shards as healthy again; delete all data +UPDATE pg_dist_shard_placement AS sp SET shardstate = 1 +FROM pg_dist_shard AS s +WHERE sp.shardid = s.shardid +AND s.logicalrelid = 'objects'::regclass; + +-- what if there are errors on different shards at different times? +\c - - - :worker_1_port +CREATE FUNCTION reject_bad() RETURNS trigger AS $rb$ + BEGIN + IF (NEW.name = 'BAD') THEN + RAISE 'illegal value'; + END IF; + + RETURN NEW; + END; +$rb$ LANGUAGE plpgsql; + +CREATE CONSTRAINT TRIGGER reject_bad +AFTER INSERT ON labs_1200002 +DEFERRABLE INITIALLY IMMEDIATE +FOR EACH ROW EXECUTE PROCEDURE reject_bad(); + +\c - - - :master_port + +BEGIN; +INSERT INTO objects VALUES (1, 'apple'); +INSERT INTO objects VALUES (2, 'BAD'); +INSERT INTO labs VALUES (8, 'Aperture Science'); +INSERT INTO labs VALUES (9, 'BAD'); +COMMIT; + +-- data should NOT be persisted +SELECT * FROM objects WHERE id = 1; +SELECT * FROM labs WHERE id = 8; + +-- all placements should remain healthy +SELECT count(*) +FROM pg_dist_shard_placement AS sp, + pg_dist_shard AS s +WHERE sp.shardid = s.shardid +AND sp.shardstate = 1 +AND (s.logicalrelid = 'objects'::regclass OR + s.logicalrelid = 'labs'::regclass); + +-- what if the failures happen at COMMIT time? +\c - - - :worker_2_port + +DROP TRIGGER reject_bad ON objects_1200003; + +CREATE CONSTRAINT TRIGGER reject_bad +AFTER INSERT ON objects_1200003 +DEFERRABLE INITIALLY DEFERRED +FOR EACH ROW EXECUTE PROCEDURE reject_bad(); + +\c - - - :master_port + +-- should be the same story as before, just at COMMIT time +BEGIN; +INSERT INTO objects VALUES (1, 'apple'); +INSERT INTO objects VALUES (2, 'BAD'); +INSERT INTO labs VALUES (9, 'Umbrella Corporation'); +COMMIT; + +-- data should be persisted +SELECT * FROM objects WHERE id = 2; +SELECT * FROM labs WHERE id = 7; + +-- but one placement should be bad +SELECT count(*) +FROM pg_dist_shard_placement AS sp, + pg_dist_shard AS s +WHERE sp.shardid = s.shardid +AND sp.nodename = 'localhost' +AND sp.nodeport = :worker_2_port +AND sp.shardstate = 3 +AND s.logicalrelid = 'objects'::regclass; + +DELETE FROM objects; + +-- mark shards as healthy again; delete all data +UPDATE pg_dist_shard_placement AS sp SET shardstate = 1 +FROM pg_dist_shard AS s +WHERE sp.shardid = s.shardid +AND s.logicalrelid = 'objects'::regclass; + +-- what if all nodes have failures at COMMIT time? +\c - - - :worker_1_port + +DROP TRIGGER reject_bad ON labs_1200002; + +CREATE CONSTRAINT TRIGGER reject_bad +AFTER INSERT ON labs_1200002 +DEFERRABLE INITIALLY DEFERRED +FOR EACH ROW EXECUTE PROCEDURE reject_bad(); + +\c - - - :master_port + +BEGIN; +INSERT INTO objects VALUES (1, 'apple'); +INSERT INTO objects VALUES (2, 'BAD'); +INSERT INTO labs VALUES (8, 'Aperture Science'); +INSERT INTO labs VALUES (9, 'BAD'); +COMMIT; + +-- data should NOT be persisted +SELECT * FROM objects WHERE id = 1; +SELECT * FROM labs WHERE id = 8; + +-- all placements should remain healthy +SELECT count(*) +FROM pg_dist_shard_placement AS sp, + pg_dist_shard AS s +WHERE sp.shardid = s.shardid +AND sp.shardstate = 1 +AND (s.logicalrelid = 'objects'::regclass OR + s.logicalrelid = 'labs'::regclass); + +-- what if one shard (objects) succeeds but another (labs) completely fails? +\c - - - :worker_2_port + +DROP TRIGGER reject_bad ON objects_1200003; + +\c - - - :master_port + +BEGIN; +INSERT INTO objects VALUES (1, 'apple'); +INSERT INTO labs VALUES (8, 'Aperture Science'); +INSERT INTO labs VALUES (9, 'BAD'); +COMMIT; + +-- data to objects should be persisted, but labs should not... +SELECT * FROM objects WHERE id = 1; +SELECT * FROM labs WHERE id = 8; + +-- labs should be healthy, but one object placement shouldn't be +SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*) +FROM pg_dist_shard_placement AS sp, + pg_dist_shard AS s +WHERE sp.shardid = s.shardid +AND (s.logicalrelid = 'objects'::regclass OR + s.logicalrelid = 'labs'::regclass) +GROUP BY s.logicalrelid, sp.shardstate +ORDER BY s.logicalrelid, sp.shardstate; + +-- some append-partitioned tests for good measure +CREATE TABLE append_researchers ( LIKE researchers ); + +SELECT master_create_distributed_table('append_researchers', 'id', 'append'); + +SET citus.shard_replication_factor TO 1; + +SELECT master_create_empty_shard('append_researchers') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = 0, shardmaxvalue = 500000 +WHERE shardid = :new_shard_id; + +SELECT master_create_empty_shard('append_researchers') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = 500000, shardmaxvalue = 1000000 +WHERE shardid = :new_shard_id; + +SET citus.shard_replication_factor TO DEFAULT; + +-- try single-shard INSERT +BEGIN; +INSERT INTO append_researchers VALUES (0, 0, 'John Backus'); +COMMIT; + +SELECT * FROM append_researchers WHERE id = 0; + +-- try rollback +BEGIN; +DELETE FROM append_researchers WHERE id = 0; +ROLLBACK; + +SELECT * FROM append_researchers WHERE id = 0; + +-- try hitting shard on other node +BEGIN; +INSERT INTO append_researchers VALUES (1, 1, 'John McCarthy'); +INSERT INTO append_researchers VALUES (500000, 500000, 'Tony Hoare'); +ROLLBACK; + +SELECT * FROM append_researchers;