diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index b1ed3d5cd..f8745d466 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -433,7 +433,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) } /* prevent concurrent placement changes and non-commutative DML statements */ - LockShards(shardIntervalList, ShareLock); + LockShardListMetadata(shardIntervalList, ShareLock); + LockShardListResources(shardIntervalList, ShareLock); /* initialize the shard interval cache */ shardCount = cacheEntry->shardIntervalArrayLength; diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 4476e058c..6c0238a23 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -28,6 +28,7 @@ #include "catalog/pg_type.h" #include "distributed/citus_clauses.h" #include "distributed/citus_ruleutils.h" +#include "distributed/commit_protocol.h" #include "distributed/connection_cache.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" @@ -35,6 +36,7 @@ #include "distributed/multi_physical_planner.h" #include "distributed/multi_planner.h" #include "distributed/multi_router_executor.h" +#include "distributed/multi_shard_transaction.h" #include "distributed/relay_utility.h" #include "distributed/resource_lock.h" #include "executor/execdesc.h" @@ -64,6 +66,7 @@ /* controls use of locks to enforce safe commutativity */ bool AllModificationsCommutative = false; + /* * The following static variables are necessary to track the progression of * multi-statement transactions managed by the router executor. After the first @@ -84,15 +87,15 @@ static bool subXactAbortAttempted = false; /* functions needed during start phase */ static void InitTransactionStateForTask(Task *task); -static void AcquireExecutorShardLock(Task *task, CmdType commandType); static HTAB * CreateXactParticipantHash(void); /* functions needed during run phase */ -static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc, - Task *task, - bool isModificationQuery, - bool requiresMasterModification, - bool expectResults); +static bool ExecuteSingleTask(QueryDesc *queryDesc, Task *task, + bool isModificationQuery, bool expectResults); +static void ExecuteMultipleTasks(QueryDesc *queryDesc, List *taskList, + bool isModificationQuery, bool expectResults); +static List * TaskShardIntervalList(List *taskList); +static void AcquireExecutorShardLock(Task *task, CmdType commandType); static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, DestReceiver *destination, Tuplestorestate *tupleStore); @@ -106,8 +109,8 @@ static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, static bool SendQueryInSingleRowMode(PGconn *connection, char *query, ParamListInfo paramListInfo); static bool StoreQueryResult(MaterialState *routerState, PGconn *connection, - TupleDesc tupleDescriptor, int64 *rows); -static bool ConsumeQueryResult(PGconn *connection, int64 *rows); + TupleDesc tupleDescriptor, bool failOnError, int64 *rows); +static bool ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows); static void RecordShardIdParticipant(uint64 affectedShardId, NodeConnectionEntry *participantEntry); @@ -136,25 +139,6 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task) if (commandType != CMD_SELECT) { eflags |= EXEC_FLAG_SKIP_TRIGGERS; - - if (XactModificationLevel == XACT_MODIFICATION_SCHEMA) - { - ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("distributed data modifications must not appear in " - "transaction blocks which contain distributed DDL " - "commands"))); - } - - /* - * We could naturally handle function-based transactions (i.e. those - * using PL/pgSQL or similar) by checking the type of queryDesc->dest, - * but some customers already use functions that touch multiple shards - * from within a function, so we'll ignore functions for now. - */ - if (IsTransactionBlock() && xactParticipantHash == NULL) - { - InitTransactionStateForTask(task); - } } /* signal that it is a router execution */ @@ -364,7 +348,6 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) MultiPlan *multiPlan = GetMultiPlan(planStatement); Job *workerJob = multiPlan->workerJob; List *taskList = workerJob->taskList; - Task *task = NULL; EState *estate = queryDesc->estate; CmdType operation = queryDesc->operation; MemoryContext oldcontext = NULL; @@ -372,14 +355,8 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) MaterialState *routerState = (MaterialState *) queryDesc->planstate; bool sendTuples = operation == CMD_SELECT || queryDesc->plannedstmt->hasReturning; - /* router executor can only execute distributed plans with a single task */ - Assert(list_length(taskList) == 1); - task = (Task *) linitial(taskList); - Assert(estate != NULL); Assert(!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY)); - Assert(task != NULL); - oldcontext = MemoryContextSwitchTo(estate->es_query_cxt); @@ -415,7 +392,6 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) */ if (!routerState->eof_underlying) { - bool resultsOK = false; bool isModificationQuery = false; bool requiresMasterEvaluation = workerJob->requiresMasterEvaluation; @@ -430,13 +406,45 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) (int) operation))); } - resultsOK = ExecuteTaskAndStoreResults(queryDesc, task, - isModificationQuery, - requiresMasterEvaluation, - sendTuples); - if (!resultsOK) + if (requiresMasterEvaluation) { - ereport(ERROR, (errmsg("could not receive query results"))); + ListCell *taskCell = NULL; + Query *query = workerJob->jobQuery; + Oid relationId = ((RangeTblEntry *) linitial(query->rtable))->relid; + + ExecuteMasterEvaluableFunctions(query); + + foreach(taskCell, taskList) + { + Task *task = (Task *) lfirst(taskCell); + StringInfo newQueryString = makeStringInfo(); + + deparse_shard_query(query, relationId, task->anchorShardId, + newQueryString); + + elog(DEBUG4, "query before master evaluation: %s", task->queryString); + elog(DEBUG4, "query after master evaluation: %s", newQueryString->data); + + task->queryString = newQueryString->data; + } + } + + if (list_length(taskList) == 1) + { + Task *task = (Task *) linitial(taskList); + bool resultsOK = false; + + resultsOK = ExecuteSingleTask(queryDesc, task, isModificationQuery, + sendTuples); + if (!resultsOK) + { + ereport(ERROR, (errmsg("could not receive query results"))); + } + } + else + { + ExecuteMultipleTasks(queryDesc, taskList, isModificationQuery, + sendTuples); } /* mark underlying query as having executed */ @@ -483,8 +491,8 @@ out: /* - * ExecuteTaskAndStoreResults executes the task on the remote node, retrieves - * the results and stores them, if SELECT or RETURNING is used, in a tuple + * ExecuteSingleTask executes the task on the remote node, retrieves the + * results and stores them, if SELECT or RETURNING is used, in a tuple * store. * * If the task fails on one of the placements, the function retries it on @@ -493,12 +501,11 @@ out: * failed), or errors out (DML failed on all placements). */ static bool -ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, - bool isModificationQuery, - bool requiresMasterEvaluation, - bool expectResults) +ExecuteSingleTask(QueryDesc *queryDesc, Task *task, + bool isModificationQuery, + bool expectResults) { - CmdType commandType = queryDesc->operation; + CmdType operation = queryDesc->operation; TupleDesc tupleDescriptor = queryDesc->tupDesc; EState *executorState = queryDesc->estate; MaterialState *routerState = (MaterialState *) queryDesc->planstate; @@ -511,24 +518,27 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, bool gotResults = false; char *queryString = task->queryString; - if (isModificationQuery && requiresMasterEvaluation) + if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD) { - PlannedStmt *planStatement = queryDesc->plannedstmt; - MultiPlan *multiPlan = GetMultiPlan(planStatement); - Query *query = multiPlan->workerJob->jobQuery; - Oid relid = ((RangeTblEntry *) linitial(query->rtable))->relid; - StringInfo queryStringInfo = makeStringInfo(); + ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("single-shard DML commands must not appear in " + "transaction blocks which contain multi-shard data " + "modifications"))); + } - ExecuteMasterEvaluableFunctions(query); - deparse_shard_query(query, relid, task->anchorShardId, queryStringInfo); - queryString = queryStringInfo->data; - - elog(DEBUG4, "query before master evaluation: %s", task->queryString); - elog(DEBUG4, "query after master evaluation: %s", queryString); + /* + * We could naturally handle function-based transactions (i.e. those + * using PL/pgSQL or similar) by checking the type of queryDesc->dest, + * but some customers already use functions that touch multiple shards + * from within a function, so we'll ignore functions for now. + */ + if (operation != CMD_SELECT && xactParticipantHash == NULL && IsTransactionBlock()) + { + InitTransactionStateForTask(task); } /* prevent replicas of the same shard from diverging */ - AcquireExecutorShardLock(task, commandType); + AcquireExecutorShardLock(task, operation); /* * Try to run the query to completion on one placement. If the query fails @@ -538,6 +548,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, { ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); bool queryOK = false; + bool failOnError = false; int64 currentAffectedTupleCount = 0; PGconn *connection = GetConnectionForPlacement(taskPlacement, isModificationQuery); @@ -564,11 +575,12 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, if (!gotResults && expectResults) { queryOK = StoreQueryResult(routerState, connection, tupleDescriptor, - ¤tAffectedTupleCount); + failOnError, ¤tAffectedTupleCount); } else { - queryOK = ConsumeQueryResult(connection, ¤tAffectedTupleCount); + queryOK = ConsumeQueryResult(connection, failOnError, + ¤tAffectedTupleCount); } if (queryOK) @@ -642,6 +654,200 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, } +/* + * ExecuteMultipleTasks executes a list of tasks on remote nodes, retrieves + * the results and, if RETURNING is used, stores them in a tuple store. + * + * If a task fails on one of the placements, the transaction rolls back. + * Otherwise, the changes are committed using 2PC when the local transaction + * commits. + */ +static void +ExecuteMultipleTasks(QueryDesc *queryDesc, List *taskList, + bool isModificationQuery, bool expectResults) +{ + TupleDesc tupleDescriptor = queryDesc->tupDesc; + EState *executorState = queryDesc->estate; + MaterialState *routerState = (MaterialState *) queryDesc->planstate; + ParamListInfo paramListInfo = queryDesc->params; + int64 affectedTupleCount = -1; + + /* can only support modifications right now */ + Assert(isModificationQuery); + + affectedTupleCount = ExecuteModifyTasks(taskList, expectResults, paramListInfo, + routerState, tupleDescriptor); + + executorState->es_processed = affectedTupleCount; +} + + +/* + * ExecuteModifyTasks executes a list of tasks on remote nodes, and + * optionally retrieves the results and stores them in a tuple store. + * + * If a task fails on one of the placements, the transaction rolls back. + * Otherwise, the changes are committed using 2PC when the local transaction + * commits. + */ +int64 +ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListInfo, + MaterialState *routerState, TupleDesc tupleDescriptor) +{ + int64 totalAffectedTupleCount = 0; + ListCell *taskCell = NULL; + char *userName = CurrentUserName(); + List *shardIntervalList = NIL; + + if (XactModificationLevel == XACT_MODIFICATION_DATA) + { + ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("multi-shard data modifications must not appear in " + "transaction blocks which contain single-shard DML " + "commands"))); + } + + XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD; + + shardIntervalList = TaskShardIntervalList(taskList); + + /* ensure that there are no concurrent modifications on the same shards */ + LockShardListResources(shardIntervalList, ExclusiveLock); + + /* open connection to all relevant placements, if not already open */ + OpenTransactionsToAllShardPlacements(shardIntervalList, userName); + + /* send command to all relevant shard placements */ + foreach(taskCell, taskList) + { + Task *task = (Task *) lfirst(taskCell); + int64 shardId = task->anchorShardId; + char *queryString = task->queryString; + bool shardConnectionsFound = false; + ShardConnections *shardConnections = NULL; + List *connectionList = NIL; + ListCell *connectionCell = NULL; + + shardConnections = GetShardConnections(shardId, &shardConnectionsFound); + Assert(shardConnectionsFound); + + connectionList = shardConnections->connectionList; + Assert(connectionList != NIL); + + foreach(connectionCell, connectionList) + { + TransactionConnection *transactionConnection = + (TransactionConnection *) lfirst(connectionCell); + PGconn *connection = transactionConnection->connection; + bool queryOK = false; + + queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); + if (!queryOK) + { + ReraiseRemoteError(connection, NULL); + } + } + } + + /* collects results from all relevant shard placements */ + foreach(taskCell, taskList) + { + Task *task = (Task *) lfirst(taskCell); + int64 shardId = task->anchorShardId; + bool shardConnectionsFound = false; + ShardConnections *shardConnections = NULL; + List *connectionList = NIL; + ListCell *connectionCell = NULL; + int64 affectedTupleCount = 0; + bool gotResults = false; + + /* abort in case of cancellation */ + CHECK_FOR_INTERRUPTS(); + + shardConnections = GetShardConnections(shardId, &shardConnectionsFound); + connectionList = shardConnections->connectionList; + + foreach(connectionCell, connectionList) + { + TransactionConnection *transactionConnection = + (TransactionConnection *) lfirst(connectionCell); + PGconn *connection = transactionConnection->connection; + int64 currentAffectedTupleCount = 0; + bool failOnError = true; + bool queryOK PG_USED_FOR_ASSERTS_ONLY = false; + + /* + * If caller is interested, store query results the first time + * through. The output of the query's execution on other shards is + * discarded if we run there (because it's a modification query). + */ + if (!gotResults && expectResults) + { + Assert(routerState != NULL && tupleDescriptor != NULL); + + queryOK = StoreQueryResult(routerState, connection, tupleDescriptor, + failOnError, ¤tAffectedTupleCount); + } + else + { + queryOK = ConsumeQueryResult(connection, failOnError, + ¤tAffectedTupleCount); + } + + /* should have rolled back on error */ + Assert(queryOK); + + if (!gotResults) + { + affectedTupleCount = currentAffectedTupleCount; + totalAffectedTupleCount += affectedTupleCount; + } + else if (currentAffectedTupleCount != affectedTupleCount) + { + char *nodeName = ConnectionGetOptionValue(connection, "host"); + char *nodePort = ConnectionGetOptionValue(connection, "port"); + + ereport(WARNING, + (errmsg("modified "INT64_FORMAT " tuples, but expected " + "to modify "INT64_FORMAT, + currentAffectedTupleCount, affectedTupleCount), + errdetail("modified placement on %s:%s", nodeName, + nodePort))); + } + + gotResults = true; + } + } + + CHECK_FOR_INTERRUPTS(); + + return totalAffectedTupleCount; +} + + +/* + * TaskShardIntervalList returns a list of shard intervals for a given list of + * tasks. + */ +static List * +TaskShardIntervalList(List *taskList) +{ + ListCell *taskCell = NULL; + List *shardIntervalList = NIL; + + foreach(taskCell, taskList) + { + Task *task = (Task *) lfirst(taskCell); + int64 shardId = task->anchorShardId; + ShardInterval *shardInterval = LoadShardInterval(shardId); + + shardIntervalList = lappend(shardIntervalList, shardInterval); + } + + return shardIntervalList; +} + + /* * ReturnRowsFromTuplestore moves rows from a given tuplestore into a * receiver. It performs the necessary limiting to support cursors. @@ -929,7 +1135,7 @@ ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterT */ static bool StoreQueryResult(MaterialState *routerState, PGconn *connection, - TupleDesc tupleDescriptor, int64 *rows) + TupleDesc tupleDescriptor, bool failOnError, int64 *rows) { AttInMetadata *attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor); Tuplestorestate *tupleStore = NULL; @@ -947,7 +1153,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection, { routerState->tuplestorestate = tuplestore_begin_heap(false, false, work_mem); } - else + else if (!failOnError) { /* might have failed query execution on another placement before */ tuplestore_clear(routerState->tuplestorestate); @@ -974,7 +1180,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection, { char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE); int category = 0; - bool raiseError = false; + bool isConstraintViolation = false; /* * If the error code is in constraint violation class, we want to @@ -982,9 +1188,9 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection, * placements. */ category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION); - raiseError = SqlStateMatchesCategory(sqlStateString, category); + isConstraintViolation = SqlStateMatchesCategory(sqlStateString, category); - if (raiseError) + if (isConstraintViolation || failOnError) { RemoveXactConnection(connection); ReraiseRemoteError(connection, result); @@ -1056,7 +1262,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection, * has been an error. */ static bool -ConsumeQueryResult(PGconn *connection, int64 *rows) +ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows) { bool commandFailed = false; bool gotResponse = false; @@ -1086,7 +1292,7 @@ ConsumeQueryResult(PGconn *connection, int64 *rows) { char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE); int category = 0; - bool raiseError = false; + bool isConstraintViolation = false; /* * If the error code is in constraint violation class, we want to @@ -1094,9 +1300,9 @@ ConsumeQueryResult(PGconn *connection, int64 *rows) * placements. */ category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION); - raiseError = SqlStateMatchesCategory(sqlStateString, category); + isConstraintViolation = SqlStateMatchesCategory(sqlStateString, category); - if (raiseError) + if (isConstraintViolation || failOnError) { RemoveXactConnection(connection); ReraiseRemoteError(connection, result); @@ -1118,8 +1324,11 @@ ConsumeQueryResult(PGconn *connection, int64 *rows) char *currentAffectedTupleString = PQcmdTuples(result); int64 currentAffectedTupleCount = 0; - scanint8(currentAffectedTupleString, false, ¤tAffectedTupleCount); - Assert(currentAffectedTupleCount >= 0); + if (*currentAffectedTupleString != '\0') + { + scanint8(currentAffectedTupleString, false, ¤tAffectedTupleCount); + Assert(currentAffectedTupleCount >= 0); + } #if (PG_VERSION_NUM < 90600) diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 8864a7b16..05182dbba 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -1292,7 +1292,8 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, { ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), errmsg("distributed DDL commands must not appear within " - "transaction blocks containing data modifications"))); + "transaction blocks containing single-shard data " + "modifications"))); } ShowNoticeIfNotUsing2PC(); @@ -1305,7 +1306,7 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, ereport(ERROR, (errmsg("could not execute DDL command on worker node shards"))); } - XactModificationLevel = XACT_MODIFICATION_SCHEMA; + XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD; } @@ -1359,7 +1360,7 @@ ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString) Oid schemaId = get_rel_namespace(relationId); char *schemaName = get_namespace_name(schemaId); - LockShards(shardIntervalList, ShareLock); + LockShardListResources(shardIntervalList, ShareLock); OpenTransactionsToAllShardPlacements(shardIntervalList, tableOwner); foreach(shardIntervalCell, shardIntervalList) diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index d54407341..169f22d91 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -41,6 +41,7 @@ #include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_partition.h" #include "distributed/resource_lock.h" +#include "distributed/shardinterval_utils.h" #include "distributed/worker_protocol.h" #include "optimizer/clauses.h" #include "optimizer/predtest.h" @@ -55,11 +56,9 @@ #include "utils/memutils.h" -static void LockShardsForModify(List *shardIntervalList); -static bool HasReplication(List *shardIntervalList); -static int SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId); -static int SendQueryToPlacements(char *shardQueryString, - ShardConnections *shardConnections); +static List * ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, + Oid relationId); + PG_FUNCTION_INFO_V1(master_modify_multiple_shards); @@ -83,14 +82,12 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) Query *modifyQuery = NULL; Node *queryTreeNode; List *restrictClauseList = NIL; - bool isTopLevel = true; bool failOK = false; List *shardIntervalList = NIL; List *prunedShardIntervalList = NIL; + List *taskList = NIL; int32 affectedTupleCount = 0; - PreventTransactionChain(isTopLevel, "master_modify_multiple_shards"); - queryTreeNode = ParseTreeNode(queryString); if (IsA(queryTreeNode, DeleteStmt)) { @@ -163,183 +160,50 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) CHECK_FOR_INTERRUPTS(); - LockShardsForModify(prunedShardIntervalList); - - affectedTupleCount = SendQueryToShards(modifyQuery, prunedShardIntervalList, - relationId); + taskList = ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList, + relationId); + affectedTupleCount = ExecuteModifyTasks(taskList, false, NULL, NULL, NULL); PG_RETURN_INT32(affectedTupleCount); } /* - * LockShardsForModify command locks the replicas of given shard. The - * lock logic is slightly different from LockShards function. Basically, - * - * 1. If citus.all_modifications_commutative is set to true, then all locks - * are acquired as ShareLock. - * 2. If citus.all_modifications_commutative is false, then only the shards - * with 2 or more replicas are locked with ExclusiveLock. Otherwise, the - * lock is acquired with ShareLock. + * ModifyMultipleShardsTaskList builds a list of tasks to execute a query on a + * given list of shards. */ -static void -LockShardsForModify(List *shardIntervalList) +static List * +ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, Oid relationId) { - LOCKMODE lockMode = NoLock; + List *taskList = NIL; + ListCell *shardIntervalCell = NULL; + uint64 jobId = INVALID_JOB_ID; + int taskId = 1; - if (AllModificationsCommutative) - { - lockMode = ShareLock; - } - else if (!HasReplication(shardIntervalList)) - { - lockMode = ShareLock; - } - else - { - lockMode = ExclusiveLock; - } - - LockShards(shardIntervalList, lockMode); -} - - -/* - * HasReplication checks whether any of the shards in the given list has more - * than one replica. - */ -static bool -HasReplication(List *shardIntervalList) -{ - ListCell *shardIntervalCell; - bool hasReplication = false; + /* lock metadata before getting placment lists */ + LockShardListMetadata(shardIntervalList, ShareLock); foreach(shardIntervalCell, shardIntervalList) { ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); - uint64 shardId = shardInterval->shardId; - List *shardPlacementList = FinalizedShardPlacementList(shardId); - if (shardPlacementList->length > 1) - { - hasReplication = true; - } - } - - return hasReplication; -} - - -/* - * SendQueryToShards executes the given query in all placements of the given - * shard list and returns the total affected tuple count. The execution is done - * in a distributed transaction and the commit protocol is decided according to - * the value of citus.multi_shard_commit_protocol parameter. SendQueryToShards - * does not acquire locks for the shards so it is advised to acquire locks to - * the shards when necessary before calling SendQueryToShards. - */ -static int -SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId) -{ - int affectedTupleCount = 0; - char *relationOwner = TableOwner(relationId); - ListCell *shardIntervalCell = NULL; - - OpenTransactionsToAllShardPlacements(shardIntervalList, relationOwner); - - foreach(shardIntervalCell, shardIntervalList) - { - ShardInterval *shardInterval = (ShardInterval *) lfirst( - shardIntervalCell); Oid relationId = shardInterval->relationId; uint64 shardId = shardInterval->shardId; - bool shardConnectionsFound = false; - ShardConnections *shardConnections = NULL; StringInfo shardQueryString = makeStringInfo(); - char *shardQueryStringData = NULL; - int shardAffectedTupleCount = -1; - - shardConnections = GetShardConnections(shardId, &shardConnectionsFound); - Assert(shardConnectionsFound); + Task *task = NULL; deparse_shard_query(query, relationId, shardId, shardQueryString); - shardQueryStringData = shardQueryString->data; - shardAffectedTupleCount = SendQueryToPlacements(shardQueryStringData, - shardConnections); - affectedTupleCount += shardAffectedTupleCount; + + task = CitusMakeNode(Task); + task->jobId = jobId; + task->taskId = taskId++; + task->taskType = SQL_TASK; + task->queryString = shardQueryString->data; + task->dependedTaskList = NULL; + task->anchorShardId = shardId; + task->taskPlacementList = FinalizedShardPlacementList(shardId); + + taskList = lappend(taskList, task); } - /* check for cancellation one last time before returning */ - CHECK_FOR_INTERRUPTS(); - - return affectedTupleCount; -} - - -/* - * SendQueryToPlacements sends the given query string to all given placement - * connections of a shard. CommitRemoteTransactions or AbortRemoteTransactions - * should be called after all queries have been sent successfully. - */ -static int -SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections) -{ - uint64 shardId = shardConnections->shardId; - List *connectionList = shardConnections->connectionList; - ListCell *connectionCell = NULL; - int32 shardAffectedTupleCount = -1; - - Assert(connectionList != NIL); - - foreach(connectionCell, connectionList) - { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - PGconn *connection = transactionConnection->connection; - PGresult *result = NULL; - char *placementAffectedTupleString = NULL; - int32 placementAffectedTupleCount = -1; - - CHECK_FOR_INTERRUPTS(); - - /* send the query */ - result = PQexec(connection, shardQueryString); - if (PQresultStatus(result) != PGRES_COMMAND_OK) - { - WarnRemoteError(connection, result); - ereport(ERROR, (errmsg("could not send query to shard placement"))); - } - - placementAffectedTupleString = PQcmdTuples(result); - - /* returned tuple count is empty for utility commands, use 0 as affected count */ - if (*placementAffectedTupleString == '\0') - { - placementAffectedTupleCount = 0; - } - else - { - placementAffectedTupleCount = pg_atoi(placementAffectedTupleString, - sizeof(int32), 0); - } - - if ((shardAffectedTupleCount == -1) || - (shardAffectedTupleCount == placementAffectedTupleCount)) - { - shardAffectedTupleCount = placementAffectedTupleCount; - } - else - { - ereport(ERROR, - (errmsg("modified %d tuples, but expected to modify %d", - placementAffectedTupleCount, shardAffectedTupleCount), - errdetail("Affected tuple counts at placements of shard " - UINT64_FORMAT " are different.", shardId))); - } - - PQclear(result); - - transactionConnection->transactionState = TRANSACTION_STATE_OPEN; - } - - return shardAffectedTupleCount; + return taskList; } diff --git a/src/backend/distributed/transaction/multi_shard_transaction.c b/src/backend/distributed/transaction/multi_shard_transaction.c index 09d251c0e..054900469 100644 --- a/src/backend/distributed/transaction/multi_shard_transaction.c +++ b/src/backend/distributed/transaction/multi_shard_transaction.c @@ -153,7 +153,7 @@ BeginTransactionOnShardPlacements(uint64 shardId, char *userName) transactionConnection->groupId = workerNode->groupId; transactionConnection->connectionId = shardConnections->shardId; - transactionConnection->transactionState = TRANSACTION_STATE_INVALID; + transactionConnection->transactionState = TRANSACTION_STATE_OPEN; transactionConnection->connection = connection; transactionConnection->nodeName = shardPlacement->nodeName; transactionConnection->nodePort = shardPlacement->nodePort; diff --git a/src/backend/distributed/utils/connection_cache.c b/src/backend/distributed/utils/connection_cache.c index 8f852fdd4..28f97a8a7 100644 --- a/src/backend/distributed/utils/connection_cache.c +++ b/src/backend/distributed/utils/connection_cache.c @@ -125,21 +125,9 @@ void PurgeConnection(PGconn *connection) { NodeConnectionKey nodeConnectionKey; - PGconn *purgedConnection = NULL; BuildKeyForConnection(connection, &nodeConnectionKey); - purgedConnection = PurgeConnectionByKey(&nodeConnectionKey); - - /* - * It's possible the provided connection matches the host and port for - * an entry in the hash without being precisely the same connection. In - * that case, we will want to close the provided connection in addition - * to the one from the hash (which was closed by PurgeConnectionByKey). - */ - if (purgedConnection != connection) - { - PQfinish(connection); - } + PurgeConnectionByKey(&nodeConnectionKey); } diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index f9048ccf9..e7af6f27a 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -20,6 +20,7 @@ #include "distributed/listutils.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" +#include "distributed/multi_router_executor.h" #include "distributed/relay_utility.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" @@ -28,9 +29,8 @@ /* * LockShardDistributionMetadata returns after grabbing a lock for distribution - * metadata related to the specified shard, blocking if required. ExclusiveLock - * and ShareLock modes are supported. Any locks acquired using this method are - * released at transaction end. + * metadata related to the specified shard, blocking if required. Any locks + * acquired using this method are released at transaction end. */ void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode) @@ -126,12 +126,11 @@ UnlockJobResource(uint64 jobId, LOCKMODE lockmode) /* - * LockShards takes shared locks on the metadata and the data of all shards in - * shardIntervalList. This prevents concurrent placement changes and concurrent - * DML statements that require an exclusive lock. + * LockShardListMetadata takes shared locks on the metadata of all shards in + * shardIntervalList to prevents concurrent placement changes. */ void -LockShards(List *shardIntervalList, LOCKMODE lockMode) +LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode) { ListCell *shardIntervalCell = NULL; @@ -143,10 +142,28 @@ LockShards(List *shardIntervalList, LOCKMODE lockMode) ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); int64 shardId = shardInterval->shardId; - /* prevent concurrent changes to number of placements */ LockShardDistributionMetadata(shardId, lockMode); + } +} + + +/* + * LockShardListResources takes locks on all shards in shardIntervalList to + * prevent concurrent DML statements on those shards. + */ +void +LockShardListResources(List *shardIntervalList, LOCKMODE lockMode) +{ + ListCell *shardIntervalCell = NULL; + + /* lock shards in order of shard id to prevent deadlock */ + shardIntervalList = SortList(shardIntervalList, CompareShardIntervalsById); + + foreach(shardIntervalCell, shardIntervalList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); + int64 shardId = shardInterval->shardId; - /* prevent concurrent update/delete statements */ LockShardResource(shardId, lockMode); } } diff --git a/src/include/distributed/connection_cache.h b/src/include/distributed/connection_cache.h index 54fb97a22..7623cafb2 100644 --- a/src/include/distributed/connection_cache.h +++ b/src/include/distributed/connection_cache.h @@ -58,7 +58,7 @@ typedef enum XACT_MODIFICATION_INVALID = 0, /* placeholder initial value */ XACT_MODIFICATION_NONE, /* no modifications have taken place */ XACT_MODIFICATION_DATA, /* data modifications (DML) have occurred */ - XACT_MODIFICATION_SCHEMA /* schema modifications (DDL) have occurred */ + XACT_MODIFICATION_MULTI_SHARD /* multi-shard modifications have occurred */ } XactModificationType; diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index 236222f5c..61c3e0c67 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -39,4 +39,8 @@ extern void RouterExecutorFinish(QueryDesc *queryDesc); extern void RouterExecutorEnd(QueryDesc *queryDesc); extern void RegisterRouterExecutorXactCallbacks(void); +extern int64 ExecuteModifyTasks(List *taskList, bool expectResults, + ParamListInfo paramListInfo, MaterialState *routerState, + TupleDesc tupleDescriptor); + #endif /* MULTI_ROUTER_EXECUTOR_H_ */ diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index 17319a079..454e8085f 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -75,7 +75,10 @@ extern void UnlockShardResource(uint64 shardId, LOCKMODE lockmode); extern void LockJobResource(uint64 jobId, LOCKMODE lockmode); extern void UnlockJobResource(uint64 jobId, LOCKMODE lockmode); -extern void LockShards(List *shardIntervalList, LOCKMODE lockMode); +/* Lock multiple shards for safe modification */ +extern void LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode); +extern void LockShardListResources(List *shardIntervalList, LOCKMODE lockMode); + extern void LockMetadataSnapshot(LOCKMODE lockMode); #endif /* RESOURCE_LOCK_H */ diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index a7f030076..28f338a1b 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -168,13 +168,13 @@ ABORT; BEGIN; INSERT INTO labs VALUES (6, 'Bell Labs'); ALTER TABLE labs ADD COLUMN motto text; -ERROR: distributed DDL commands must not appear within transaction blocks containing data modifications +ERROR: distributed DDL commands must not appear within transaction blocks containing single-shard data modifications COMMIT; -- whether it occurs first or second BEGIN; ALTER TABLE labs ADD COLUMN motto text; INSERT INTO labs VALUES (6, 'Bell Labs'); -ERROR: distributed data modifications must not appear in transaction blocks which contain distributed DDL commands +ERROR: single-shard DML commands must not appear in transaction blocks which contain multi-shard data modifications COMMIT; -- but the DDL should correctly roll back \d labs @@ -244,7 +244,7 @@ SELECT * FROM labs WHERE id = 12; BEGIN; \copy labs from stdin delimiter ',' ALTER TABLE labs ADD COLUMN motto text; -ERROR: distributed DDL commands must not appear within transaction blocks containing data modifications +ERROR: distributed DDL commands must not appear within transaction blocks containing single-shard data modifications COMMIT; -- the DDL fails, but copy persists \d labs diff --git a/src/test/regress/expected/multi_shard_modify.out b/src/test/regress/expected/multi_shard_modify.out index d722c3ad3..328b6ce33 100644 --- a/src/test/regress/expected/multi_shard_modify.out +++ b/src/test/regress/expected/multi_shard_modify.out @@ -22,11 +22,27 @@ SELECT master_create_worker_shards('multi_shard_modify_test', 4, 2); COPY multi_shard_modify_test (t_key, t_name, t_value) FROM STDIN WITH (FORMAT 'csv'); -- Testing master_modify_multiple_shards --- Verify that master_modify_multiple_shards cannot be called in a transaction block +-- Verify that master_modify_multiple_shards can be rolled back BEGIN; SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key > 10 AND t_key <= 13'); -ERROR: master_modify_multiple_shards cannot run inside a transaction block + master_modify_multiple_shards +------------------------------- + 3 +(1 row) + +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = 202'); + master_modify_multiple_shards +------------------------------- + 1 +(1 row) + ROLLBACK; +SELECT count(*) FROM multi_shard_modify_test; + count +------- + 27 +(1 row) + -- Check that master_modify_multiple_shards cannot be called with non-distributed tables CREATE TEMPORARY TABLE temporary_nondistributed_table (col_1 integer,col_2 text); INSERT INTO temporary_nondistributed_table VALUES (37, 'eren'), (31, 'onder'); diff --git a/src/test/regress/expected/multi_truncate.out b/src/test/regress/expected/multi_truncate.out index 09fb65e06..0e17df7de 100644 --- a/src/test/regress/expected/multi_truncate.out +++ b/src/test/regress/expected/multi_truncate.out @@ -138,11 +138,15 @@ SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::re 1210005 (3 rows) --- command can not be run inside transaction -BEGIN; TRUNCATE TABLE test_truncate_range; COMMIT; -ERROR: master_modify_multiple_shards cannot run inside a transaction block -CONTEXT: SQL statement "SELECT master_modify_multiple_shards(commandText)" -PL/pgSQL function citus_truncate_trigger() line 17 at PERFORM +-- verify that truncate can be aborted +INSERT INTO test_truncate_range VALUES (1); +BEGIN; TRUNCATE TABLE test_truncate_range; ROLLBACK; +SELECT count(*) FROM test_truncate_range; + count +------- + 1 +(1 row) + DROP TABLE test_truncate_range; -- -- truncate for hash distribution. @@ -226,11 +230,15 @@ SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::reg 1210009 (4 rows) --- command can not be run inside transaction -BEGIN; TRUNCATE TABLE test_truncate_hash; COMMIT; -ERROR: master_modify_multiple_shards cannot run inside a transaction block -CONTEXT: SQL statement "SELECT master_modify_multiple_shards(commandText)" -PL/pgSQL function citus_truncate_trigger() line 17 at PERFORM +-- verify that truncate can be aborted +INSERT INTO test_truncate_hash VALUES (1); +BEGIN; TRUNCATE TABLE test_truncate_hash; ROLLBACK; +SELECT count(*) FROM test_truncate_hash; + count +------- + 1 +(1 row) + DROP TABLE test_truncate_hash; -- test with table with spaces in it CREATE TABLE "a b hash" (a int, b int); diff --git a/src/test/regress/sql/multi_shard_modify.sql b/src/test/regress/sql/multi_shard_modify.sql index 9b1fabf26..6564e6f11 100644 --- a/src/test/regress/sql/multi_shard_modify.sql +++ b/src/test/regress/sql/multi_shard_modify.sql @@ -46,11 +46,15 @@ COPY multi_shard_modify_test (t_key, t_name, t_value) FROM STDIN WITH (FORMAT 'c \. -- Testing master_modify_multiple_shards --- Verify that master_modify_multiple_shards cannot be called in a transaction block + +-- Verify that master_modify_multiple_shards can be rolled back BEGIN; SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key > 10 AND t_key <= 13'); +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = 202'); ROLLBACK; +SELECT count(*) FROM multi_shard_modify_test; + -- Check that master_modify_multiple_shards cannot be called with non-distributed tables CREATE TEMPORARY TABLE temporary_nondistributed_table (col_1 integer,col_2 text); INSERT INTO temporary_nondistributed_table VALUES (37, 'eren'), (31, 'onder'); diff --git a/src/test/regress/sql/multi_truncate.sql b/src/test/regress/sql/multi_truncate.sql index 4abc9f27e..dcd5ea306 100644 --- a/src/test/regress/sql/multi_truncate.sql +++ b/src/test/regress/sql/multi_truncate.sql @@ -89,8 +89,10 @@ SELECT count(*) FROM test_truncate_range; -- verify 3 shards are still present SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass; --- command can not be run inside transaction -BEGIN; TRUNCATE TABLE test_truncate_range; COMMIT; +-- verify that truncate can be aborted +INSERT INTO test_truncate_range VALUES (1); +BEGIN; TRUNCATE TABLE test_truncate_range; ROLLBACK; +SELECT count(*) FROM test_truncate_range; DROP TABLE test_truncate_range; @@ -136,8 +138,10 @@ SELECT count(*) FROM test_truncate_hash; -- verify 4 shards are still presents SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass; --- command can not be run inside transaction -BEGIN; TRUNCATE TABLE test_truncate_hash; COMMIT; +-- verify that truncate can be aborted +INSERT INTO test_truncate_hash VALUES (1); +BEGIN; TRUNCATE TABLE test_truncate_hash; ROLLBACK; +SELECT count(*) FROM test_truncate_hash; DROP TABLE test_truncate_hash; @@ -173,4 +177,4 @@ TRUNCATE TABLE "a b append"; -- verify all shards are dropped SELECT shardid FROM pg_dist_shard where logicalrelid = '"a b append"'::regclass; -DROP TABLE "a b append"; \ No newline at end of file +DROP TABLE "a b append";