diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 899384ad5..cdef7ab97 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -53,6 +53,7 @@ #include "distributed/coordinator_protocol.h" #include "distributed/deparser.h" #include "distributed/deparse_shard_query.h" +#include "distributed/executor_util.h" #include "distributed/foreign_key_relationship.h" #include "distributed/listutils.h" #include "distributed/local_executor.h" diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index b9b1163e4..4bb2d5e57 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -141,6 +141,7 @@ #include "distributed/connection_management.h" #include "distributed/commands/multi_copy.h" #include "distributed/deparse_shard_query.h" +#include "distributed/executor_util.h" #include "distributed/shared_connection_stats.h" #include "distributed/distributed_execution_locks.h" #include "distributed/intermediate_result_pruning.h" @@ -655,14 +656,7 @@ static void SequentialRunDistributedExecution(DistributedExecution *execution); static void FinishDistributedExecution(DistributedExecution *execution); static void CleanUpSessions(DistributedExecution *execution); -static void LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan); -static void AcquireExecutorShardLocksForExecution(DistributedExecution *execution); -static bool ModifiedTableReplicated(List *taskList); static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution); -static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList); -static bool DistributedExecutionRequiresRollback(List *taskList); -static bool TaskListRequires2PC(List *taskList); -static bool SelectForUpdateOnReferenceTable(List *taskList); static void AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution); static void UnclaimAllSessionConnections(List *sessionList); static PlacementExecutionOrder ExecutionOrderForTask(RowModifyLevel modLevel, Task *task); @@ -718,10 +712,6 @@ static bool CanFailoverPlacementExecutionToLocalExecution(TaskPlacementExecution static void PlacementExecutionReady(TaskPlacementExecution *placementExecution); static TaskExecutionState TaskExecutionStateMachine(ShardCommandExecution * shardCommandExecution); -static bool HasDependentJobs(Job *mainJob); -static void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo, - Oid **parameterTypes, - const char ***parameterValues); static int GetEventSetSize(List *sessionList); static bool ProcessSessionsWithFailedWaitEventSetOperations( DistributedExecution *execution); @@ -737,14 +727,10 @@ static void ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount); #endif static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime); static uint64 MicrosecondsBetweenTimestamps(instr_time startTime, instr_time endTime); -static HeapTuple BuildTupleFromBytes(AttInMetadata *attinmeta, fmStringInfo *values); -static AttInMetadata * TupleDescGetAttBinaryInMetadata(TupleDesc tupdesc); static int WorkerPoolCompare(const void *lhsKey, const void *rhsKey); static void SetAttributeInputMetadata(DistributedExecution *execution, ShardCommandExecution *shardCommandExecution); -static void LookupTaskPlacementHostAndPort(ShardPlacement *taskPlacement, char **nodeName, - int *nodePort); -static bool IsDummyPlacement(ShardPlacement *taskPlacement); + /* * AdaptiveExecutorPreExecutorRun gets called right before postgres starts its executor @@ -830,7 +816,7 @@ AdaptiveExecutor(CitusScanState *scanState) paramListInfo); } - bool hasDependentJobs = HasDependentJobs(job); + bool hasDependentJobs = job->dependentJobList != NIL; if (hasDependentJobs) { /* jobs use intermediate results, which require a distributed transaction */ @@ -915,17 +901,6 @@ AdaptiveExecutor(CitusScanState *scanState) } -/* - * HasDependentJobs returns true if there is any dependent job - * for the mainjob(top level) job. - */ -static bool -HasDependentJobs(Job *mainJob) -{ - return list_length(mainJob->dependentJobList) > 0; -} - - /* * RunLocalExecution runs the localTaskList in the execution, fills the tuplestore * and sets the es_processed if necessary. @@ -1268,7 +1243,7 @@ DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList, return xactProperties; } - if (DistributedExecutionRequiresRollback(taskList)) + if (TaskListRequiresRollback(taskList)) { /* transaction blocks are required if the task list needs to roll back */ xactProperties.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED; @@ -1328,8 +1303,11 @@ StartDistributedExecution(DistributedExecution *execution) * to the first worker in a transaction block, which activates a coordinated * transaction. We need to do this before determining whether the execution * should use transaction blocks (see below). + * + * We acquire the locks for both the remote and local tasks. */ - AcquireExecutorShardLocksForExecution(execution); + AcquireExecutorShardLocksForExecution(execution->modLevel, + execution->remoteAndLocalTaskList); /* * We should not record parallel access if the target pool size is less than 2. @@ -1372,546 +1350,6 @@ DistributedExecutionModifiesDatabase(DistributedExecution *execution) } -/* - * DistributedPlanModifiesDatabase returns true if the plan modifies the data - * or the schema. - */ -bool -DistributedPlanModifiesDatabase(DistributedPlan *plan) -{ - return TaskListModifiesDatabase(plan->modLevel, plan->workerJob->taskList); -} - - -/* - * TaskListModifiesDatabase is a helper function for DistributedExecutionModifiesDatabase and - * DistributedPlanModifiesDatabase. - */ -static bool -TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList) -{ - if (modLevel > ROW_MODIFY_READONLY) - { - return true; - } - - /* - * If we cannot decide by only checking the row modify level, - * we should look closer to the tasks. - */ - if (list_length(taskList) < 1) - { - /* is this ever possible? */ - return false; - } - - Task *firstTask = (Task *) linitial(taskList); - - return !ReadOnlyTask(firstTask->taskType); -} - - -/* - * DistributedExecutionRequiresRollback returns true if the distributed - * execution should start a CoordinatedTransaction. In other words, if the - * function returns true, the execution sends BEGIN; to every connection - * involved in the distributed execution. - */ -static bool -DistributedExecutionRequiresRollback(List *taskList) -{ - int taskCount = list_length(taskList); - - if (taskCount == 0) - { - return false; - } - - Task *task = (Task *) linitial(taskList); - if (task->cannotBeExecutedInTransction) - { - /* vacuum, create index concurrently etc. */ - return false; - } - - bool selectForUpdate = task->relationRowLockList != NIL; - if (selectForUpdate) - { - /* - * Do not check SelectOpensTransactionBlock, always open transaction block - * if SELECT FOR UPDATE is executed inside a distributed transaction. - */ - return IsMultiStatementTransaction(); - } - - if (ReadOnlyTask(task->taskType)) - { - return SelectOpensTransactionBlock && - IsTransactionBlock(); - } - - if (IsMultiStatementTransaction()) - { - return true; - } - - if (list_length(taskList) > 1) - { - return true; - } - - if (list_length(task->taskPlacementList) > 1) - { - /* - * Single DML/DDL tasks with replicated tables (including - * reference and non-reference tables) should require - * BEGIN/COMMIT/ROLLBACK. - */ - return true; - } - - if (task->queryCount > 1) - { - /* - * When there are multiple sequential queries in a task - * we need to run those as a transaction. - */ - return true; - } - - return false; -} - - -/* - * TaskListRequires2PC determines whether the given task list requires 2PC. - */ -static bool -TaskListRequires2PC(List *taskList) -{ - if (taskList == NIL) - { - return false; - } - - Task *task = (Task *) linitial(taskList); - if (ReadOnlyTask(task->taskType)) - { - /* we do not trigger 2PC for ReadOnly queries */ - return false; - } - - bool singleTask = list_length(taskList) == 1; - if (singleTask && list_length(task->taskPlacementList) == 1) - { - /* we do not trigger 2PC for modifications that are: - * - single task - * - single placement - */ - return false; - } - - /* - * Otherwise, all modifications are done via 2PC. This includes: - * - Multi-shard commands irrespective of the replication factor - * - Single-shard commands that are targeting more than one replica - */ - return true; -} - - -/* - * ReadOnlyTask returns true if the input task does a read-only operation - * on the database. - */ -bool -ReadOnlyTask(TaskType taskType) -{ - switch (taskType) - { - case READ_TASK: - case MAP_OUTPUT_FETCH_TASK: - case MAP_TASK: - case MERGE_TASK: - { - return true; - } - - default: - { - return false; - } - } -} - - -/* - * TaskListCannotBeExecutedInTransaction returns true if any of the - * tasks in the input cannot be executed in a transaction. These are - * tasks like VACUUM or CREATE INDEX CONCURRENTLY etc. - */ -bool -TaskListCannotBeExecutedInTransaction(List *taskList) -{ - Task *task = NULL; - foreach_ptr(task, taskList) - { - if (task->cannotBeExecutedInTransction) - { - return true; - } - } - - return false; -} - - -/* - * SelectForUpdateOnReferenceTable returns true if the input task - * contains a FOR UPDATE clause that locks any reference tables. - */ -static bool -SelectForUpdateOnReferenceTable(List *taskList) -{ - if (list_length(taskList) != 1) - { - /* we currently do not support SELECT FOR UPDATE on multi task queries */ - return false; - } - - Task *task = (Task *) linitial(taskList); - RelationRowLock *relationRowLock = NULL; - foreach_ptr(relationRowLock, task->relationRowLockList) - { - Oid relationId = relationRowLock->relationId; - - if (IsCitusTableType(relationId, REFERENCE_TABLE)) - { - return true; - } - } - - return false; -} - - -/* - * LockPartitionsForDistributedPlan ensures commands take locks on all partitions - * of a distributed table that appears in the query. We do this primarily out of - * consistency with PostgreSQL locking. - */ -static void -LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan) -{ - if (DistributedPlanModifiesDatabase(distributedPlan)) - { - Oid targetRelationId = distributedPlan->targetRelationId; - - LockPartitionsInRelationList(list_make1_oid(targetRelationId), RowExclusiveLock); - } - - /* - * Lock partitions of tables that appear in a SELECT or subquery. In the - * DML case this also includes the target relation, but since we already - * have a stronger lock this doesn't do any harm. - */ - LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock); -} - - -/* - * AcquireExecutorShardLocksForExecution acquires advisory lock on shard IDs - * to prevent unsafe concurrent modifications of shards. - * - * We prevent concurrent modifications of shards in two cases: - * 1. Any non-commutative writes to a replicated table - * 2. Multi-shard writes that are executed in parallel - * - * The first case ensures we do not apply updates in different orders on - * different replicas (e.g. of a reference table), which could lead the - * replicas to diverge. - * - * The second case prevents deadlocks due to out-of-order execution. - * - * There are two GUCs that can override the default behaviors. - * 'citus.all_modifications_commutative' relaxes locking - * that's done for the purpose of keeping replicas consistent. - * 'citus.enable_deadlock_prevention' relaxes locking done for - * the purpose of avoiding deadlocks between concurrent - * multi-shard commands. - * - * We do not take executor shard locks for utility commands such as - * TRUNCATE because the table locks already prevent concurrent access. - */ -static void -AcquireExecutorShardLocksForExecution(DistributedExecution *execution) -{ - RowModifyLevel modLevel = execution->modLevel; - - /* acquire the locks for both the remote and local tasks */ - List *taskList = execution->remoteAndLocalTaskList; - - if (modLevel <= ROW_MODIFY_READONLY && - !SelectForUpdateOnReferenceTable(taskList)) - { - /* - * Executor locks only apply to DML commands and SELECT FOR UPDATE queries - * touching reference tables. - */ - return; - } - - bool requiresParallelExecutionLocks = - !(list_length(taskList) == 1 || ShouldRunTasksSequentially(taskList)); - - bool modifiedTableReplicated = ModifiedTableReplicated(taskList); - if (!modifiedTableReplicated && !requiresParallelExecutionLocks) - { - /* - * When a distributed query on tables with replication - * factor == 1 and command hits only a single shard, we - * rely on Postgres to handle the serialization of the - * concurrent modifications on the workers. - * - * For reference tables, even if their placements are replicated - * ones (e.g., single node), we acquire the distributed execution - * locks to be consistent when new node(s) are added. So, they - * do not return at this point. - */ - return; - } - - /* - * We first assume that all the remaining modifications are going to - * be serialized. So, start with an ExclusiveLock and lower the lock level - * as much as possible. - */ - int lockMode = ExclusiveLock; - - /* - * In addition to honouring commutativity rules, we currently only - * allow a single multi-shard command on a shard at a time. Otherwise, - * concurrent multi-shard commands may take row-level locks on the - * shard placements in a different order and create a distributed - * deadlock. This applies even when writes are commutative and/or - * there is no replication. This can be relaxed via - * EnableDeadlockPrevention. - * - * 1. If citus.all_modifications_commutative is set to true, then all locks - * are acquired as RowExclusiveLock. - * - * 2. If citus.all_modifications_commutative is false, then only the shards - * with more than one replicas are locked with ExclusiveLock. Otherwise, the - * lock is acquired with ShareUpdateExclusiveLock. - * - * ShareUpdateExclusiveLock conflicts with itself such that only one - * multi-shard modification at a time is allowed on a shard. It also conflicts - * with ExclusiveLock, which ensures that updates/deletes/upserts are applied - * in the same order on all placements. It does not conflict with - * RowExclusiveLock, which is normally obtained by single-shard, commutative - * writes. - */ - if (!modifiedTableReplicated && requiresParallelExecutionLocks) - { - /* - * When there is no replication then we only need to prevent - * concurrent multi-shard commands on the same shards. This is - * because concurrent, parallel commands may modify the same - * set of shards, but in different orders. The order of the - * accesses might trigger distributed deadlocks that are not - * possible to happen on non-distributed systems such - * regular Postgres. - * - * As an example, assume that we have two queries: query-1 and query-2. - * Both queries access shard-1 and shard-2. If query-1 first accesses to - * shard-1 then shard-2, and query-2 accesses shard-2 then shard-1, these - * two commands might block each other in case they modify the same rows - * (e.g., cause distributed deadlocks). - * - * In either case, ShareUpdateExclusive has the desired effect, since - * it conflicts with itself and ExclusiveLock (taken by non-commutative - * writes). - * - * However, some users find this too restrictive, so we allow them to - * reduce to a RowExclusiveLock when citus.enable_deadlock_prevention - * is enabled, which lets multi-shard modifications run in parallel as - * long as they all disable the GUC. - */ - lockMode = - EnableDeadlockPrevention ? ShareUpdateExclusiveLock : RowExclusiveLock; - - if (!IsCoordinator()) - { - /* - * We also skip taking a heavy-weight lock when running a multi-shard - * commands from workers, since we currently do not prevent concurrency - * across workers anyway. - */ - lockMode = RowExclusiveLock; - } - } - else if (modifiedTableReplicated) - { - /* - * When we are executing distributed queries on replicated tables, our - * default behaviour is to prevent any concurrency. This is valid - * for when parallel execution is happening or not. - * - * The reason is that we cannot control the order of the placement accesses - * of two distributed queries to the same shards. The order of the accesses - * might cause the replicas of the same shard placements diverge. This is - * not possible to happen on non-distributed systems such regular Postgres. - * - * As an example, assume that we have two queries: query-1 and query-2. - * Both queries only access the placements of shard-1, say p-1 and p-2. - * - * And, assume that these queries are non-commutative, such as: - * query-1: UPDATE table SET b = 1 WHERE key = 1; - * query-2: UPDATE table SET b = 2 WHERE key = 1; - * - * If query-1 accesses to p-1 then p-2, and query-2 accesses - * p-2 then p-1, these two commands would leave the p-1 and p-2 - * diverged (e.g., the values for the column "b" would be different). - * - * The only exception to this rule is the single shard commutative - * modifications, such as INSERTs. In that case, we can allow - * concurrency among such backends, hence lowering the lock level - * to RowExclusiveLock. - */ - if (!requiresParallelExecutionLocks && modLevel < ROW_MODIFY_NONCOMMUTATIVE) - { - lockMode = RowExclusiveLock; - } - } - - if (AllModificationsCommutative) - { - /* - * The mapping is overridden when all_modifications_commutative is set to true. - * In that case, all modifications are treated as commutative, which can be used - * to communicate that the application is only generating commutative - * UPDATE/DELETE/UPSERT commands and exclusive locks are unnecessary. This - * is irrespective of single-shard/multi-shard or replicated tables. - */ - lockMode = RowExclusiveLock; - } - - /* now, iterate on the tasks and acquire the executor locks on the shards */ - List *anchorShardIntervalList = NIL; - List *relationRowLockList = NIL; - List *requiresConsistentSnapshotRelationShardList = NIL; - - Task *task = NULL; - foreach_ptr(task, taskList) - { - ShardInterval *anchorShardInterval = LoadShardInterval(task->anchorShardId); - anchorShardIntervalList = lappend(anchorShardIntervalList, anchorShardInterval); - - /* Acquire additional locks for SELECT .. FOR UPDATE on reference tables */ - AcquireExecutorShardLocksForRelationRowLockList(task->relationRowLockList); - - relationRowLockList = - list_concat(relationRowLockList, - task->relationRowLockList); - - /* - * If the task has a subselect, then we may need to lock the shards from which - * the query selects as well to prevent the subselects from seeing different - * results on different replicas. - */ - if (RequiresConsistentSnapshot(task)) - { - /* - * ExclusiveLock conflicts with all lock types used by modifications - * and therefore prevents other modifications from running - * concurrently. - */ - requiresConsistentSnapshotRelationShardList = - list_concat(requiresConsistentSnapshotRelationShardList, - task->relationShardList); - } - } - - /* - * Acquire the locks in a sorted way to avoid deadlocks due to lock - * ordering across concurrent sessions. - */ - anchorShardIntervalList = - SortList(anchorShardIntervalList, CompareShardIntervalsById); - - /* - * If we are dealing with a partition we are also taking locks on parent table - * to prevent deadlocks on concurrent operations on a partition and its parent. - * - * Note that this function currently does not acquire any remote locks as that - * is necessary to control the concurrency across multiple nodes for replicated - * tables. That is because Citus currently does not allow modifications to - * partitions from any node other than the coordinator. - */ - LockParentShardResourceIfPartition(anchorShardIntervalList, lockMode); - - /* Acquire distribution execution locks on the affected shards */ - SerializeNonCommutativeWrites(anchorShardIntervalList, lockMode); - - if (relationRowLockList != NIL) - { - /* Acquire additional locks for SELECT .. FOR UPDATE on reference tables */ - AcquireExecutorShardLocksForRelationRowLockList(relationRowLockList); - } - - - if (requiresConsistentSnapshotRelationShardList != NIL) - { - /* - * If the task has a subselect, then we may need to lock the shards from which - * the query selects as well to prevent the subselects from seeing different - * results on different replicas. - * - * ExclusiveLock conflicts with all lock types used by modifications - * and therefore prevents other modifications from running - * concurrently. - */ - LockRelationShardResources(requiresConsistentSnapshotRelationShardList, - ExclusiveLock); - } -} - - -/* - * ModifiedTableReplicated iterates on the task list and returns true - * if any of the tasks' anchor shard is a replicated table. We qualify - * replicated tables as any reference table or any distributed table with - * replication factor > 1. - */ -static bool -ModifiedTableReplicated(List *taskList) -{ - Task *task = NULL; - foreach_ptr(task, taskList) - { - int64 shardId = task->anchorShardId; - - if (shardId == INVALID_SHARD_ID) - { - continue; - } - - if (ReferenceTableShardId(shardId)) - { - return true; - } - - Oid relationId = RelationIdForShard(shardId); - if (!SingleReplicatedTable(relationId)) - { - return true; - } - } - - return false; -} - - /* * FinishDistributedExecution cleans up resources associated with a * distributed execution. @@ -1927,111 +1365,6 @@ FinishDistributedExecution(DistributedExecution *execution) } -/* - * CleanUpSessions does any clean-up necessary for the session used - * during the execution. We only reach the function after successfully - * completing all the tasks and we expect no tasks are still in progress. - */ -static void -CleanUpSessions(DistributedExecution *execution) -{ - List *sessionList = execution->sessionList; - - /* we get to this function only after successful executions */ - Assert(!execution->failed && execution->unfinishedTaskCount == 0); - - /* always trigger wait event set in the first round */ - WorkerSession *session = NULL; - foreach_ptr(session, sessionList) - { - MultiConnection *connection = session->connection; - - ereport(DEBUG4, (errmsg("Total number of commands sent over the session %ld: %ld " - "to node %s:%d", session->sessionId, - session->commandsSent, - connection->hostname, connection->port))); - - UnclaimConnection(connection); - - if (connection->connectionState == MULTI_CONNECTION_CONNECTING || - connection->connectionState == MULTI_CONNECTION_FAILED || - connection->connectionState == MULTI_CONNECTION_LOST || - connection->connectionState == MULTI_CONNECTION_TIMED_OUT) - { - /* - * We want the MultiConnection go away and not used in - * the subsequent executions. - * - * We cannot get MULTI_CONNECTION_LOST via the ConnectionStateMachine, - * but we might get it via the connection API and find us here before - * changing any states in the ConnectionStateMachine. - * - */ - CloseConnection(connection); - } - else if (connection->connectionState == MULTI_CONNECTION_CONNECTED) - { - RemoteTransaction *transaction = &(connection->remoteTransaction); - RemoteTransactionState transactionState = transaction->transactionState; - - if (transactionState == REMOTE_TRANS_CLEARING_RESULTS) - { - /* - * We might have established the connection, and even sent BEGIN, but not - * get to the point where we assigned a task to this specific connection - * (because other connections in the pool already finished all the tasks). - */ - Assert(session->commandsSent == 0); - - ClearResults(connection, false); - } - else if (!(transactionState == REMOTE_TRANS_NOT_STARTED || - transactionState == REMOTE_TRANS_STARTED)) - { - /* - * We don't have to handle anything else. Note that the execution - * could only finish on connectionStates of MULTI_CONNECTION_CONNECTING, - * MULTI_CONNECTION_FAILED and MULTI_CONNECTION_CONNECTED. The first two - * are already handled above. - * - * When we're on MULTI_CONNECTION_CONNECTED, TransactionStateMachine - * ensures that all the necessary commands are successfully sent over - * the connection and everything is cleared up. Otherwise, we'd have been - * on MULTI_CONNECTION_FAILED state. - */ - ereport(WARNING, (errmsg("unexpected transaction state at the end of " - "execution: %d", transactionState))); - } - - /* get ready for the next executions if we need use the same connection */ - connection->waitFlags = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE; - } - else - { - ereport(WARNING, (errmsg("unexpected connection state at the end of " - "execution: %d", connection->connectionState))); - } - } -} - - -/* - * UnclaimAllSessionConnections unclaims all of the connections for the given - * sessionList. - */ -static void -UnclaimAllSessionConnections(List *sessionList) -{ - WorkerSession *session = NULL; - foreach_ptr(session, sessionList) - { - MultiConnection *connection = session->connection; - - UnclaimConnection(connection); - } -} - - /* * AssignTasksToConnectionsOrWorkerPool goes through the list of tasks to determine whether any * task placements need to be assigned to particular connections because of preceding @@ -2222,48 +1555,6 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) } -/* - * LookupTaskPlacementHostAndPort sets the nodename and nodeport for the given task placement - * with a lookup. - */ -static void -LookupTaskPlacementHostAndPort(ShardPlacement *taskPlacement, char **nodeName, - int *nodePort) -{ - if (IsDummyPlacement(taskPlacement)) - { - /* - * If we create a dummy placement for the local node, it is possible - * that the entry doesn't exist in pg_dist_node, hence a lookup will fail. - * In that case we want to use the dummy placements values. - */ - *nodeName = taskPlacement->nodeName; - *nodePort = taskPlacement->nodePort; - } - else - { - /* - * We want to lookup the node information again since it is possible that - * there were changes in pg_dist_node and we will get those invalidations - * in LookupNodeForGroup. - */ - WorkerNode *workerNode = LookupNodeForGroup(taskPlacement->groupId); - *nodeName = workerNode->workerName; - *nodePort = workerNode->workerPort; - } -} - - -/* - * IsDummyPlacement returns true if the given placement is a dummy placement. - */ -static bool -IsDummyPlacement(ShardPlacement *taskPlacement) -{ - return taskPlacement->nodeId == LOCAL_NODE_ID; -} - - /* * WorkerPoolCompare is based on WorkerNodeCompare function. The function * compares two worker nodes by their host name and port number. @@ -2510,40 +1801,6 @@ RemoteSocketClosedForAnySession(DistributedExecution *execution) #endif -/* - * ShouldRunTasksSequentially returns true if each of the individual tasks - * should be executed one by one. Note that this is different than - * MultiShardConnectionType == SEQUENTIAL_CONNECTION case. In that case, - * running the tasks across the nodes in parallel is acceptable and implemented - * in that way. - * - * However, the executions that are qualified here would perform poorly if the - * tasks across the workers are executed in parallel. We currently qualify only - * one class of distributed queries here, multi-row INSERTs. If we do not enforce - * true sequential execution, concurrent multi-row upserts could easily form - * a distributed deadlock when the upserts touch the same rows. - */ -bool -ShouldRunTasksSequentially(List *taskList) -{ - if (list_length(taskList) < 2) - { - /* single task plans are already qualified as sequential by definition */ - return false; - } - - /* all the tasks are the same, so we only look one */ - Task *initialTask = (Task *) linitial(taskList); - if (initialTask->rowValuesLists != NIL) - { - /* found a multi-row INSERT */ - return true; - } - - return false; -} - - /* * SequentialRunDistributedExecution gets a distributed execution and * executes each individual task in the execution sequentially, one @@ -4909,120 +4166,6 @@ ReceiveResults(WorkerSession *session, bool storeRows) } -/* - * TupleDescGetAttBinaryInMetadata - Build an AttInMetadata structure based on - * the supplied TupleDesc. AttInMetadata can be used in conjunction with - * fmStringInfos containing binary encoded types to produce a properly formed - * tuple. - * - * NOTE: This function is a copy of the PG function TupleDescGetAttInMetadata, - * except that it uses getTypeBinaryInputInfo instead of getTypeInputInfo. - */ -static AttInMetadata * -TupleDescGetAttBinaryInMetadata(TupleDesc tupdesc) -{ - int natts = tupdesc->natts; - int i; - Oid atttypeid; - Oid attinfuncid; - - AttInMetadata *attinmeta = (AttInMetadata *) palloc(sizeof(AttInMetadata)); - - /* "Bless" the tupledesc so that we can make rowtype datums with it */ - attinmeta->tupdesc = BlessTupleDesc(tupdesc); - - /* - * Gather info needed later to call the "in" function for each attribute - */ - FmgrInfo *attinfuncinfo = (FmgrInfo *) palloc0(natts * sizeof(FmgrInfo)); - Oid *attioparams = (Oid *) palloc0(natts * sizeof(Oid)); - int32 *atttypmods = (int32 *) palloc0(natts * sizeof(int32)); - - for (i = 0; i < natts; i++) - { - Form_pg_attribute att = TupleDescAttr(tupdesc, i); - - /* Ignore dropped attributes */ - if (!att->attisdropped) - { - atttypeid = att->atttypid; - getTypeBinaryInputInfo(atttypeid, &attinfuncid, &attioparams[i]); - fmgr_info(attinfuncid, &attinfuncinfo[i]); - atttypmods[i] = att->atttypmod; - } - } - attinmeta->attinfuncs = attinfuncinfo; - attinmeta->attioparams = attioparams; - attinmeta->atttypmods = atttypmods; - - return attinmeta; -} - - -/* - * BuildTupleFromBytes - build a HeapTuple given user data in binary form. - * values is an array of StringInfos, one for each attribute of the return - * tuple. A NULL StringInfo pointer indicates we want to create a NULL field. - * - * NOTE: This function is a copy of the PG function BuildTupleFromCStrings, - * except that it uses ReceiveFunctionCall instead of InputFunctionCall. - */ -static HeapTuple -BuildTupleFromBytes(AttInMetadata *attinmeta, fmStringInfo *values) -{ - TupleDesc tupdesc = attinmeta->tupdesc; - int natts = tupdesc->natts; - int i; - - Datum *dvalues = (Datum *) palloc(natts * sizeof(Datum)); - bool *nulls = (bool *) palloc(natts * sizeof(bool)); - - /* - * Call the "in" function for each non-dropped attribute, even for nulls, - * to support domains. - */ - for (i = 0; i < natts; i++) - { - if (!TupleDescAttr(tupdesc, i)->attisdropped) - { - /* Non-dropped attributes */ - dvalues[i] = ReceiveFunctionCall(&attinmeta->attinfuncs[i], - values[i], - attinmeta->attioparams[i], - attinmeta->atttypmods[i]); - if (values[i] != NULL) - { - nulls[i] = false; - } - else - { - nulls[i] = true; - } - } - else - { - /* Handle dropped attributes by setting to NULL */ - dvalues[i] = (Datum) 0; - nulls[i] = true; - } - } - - /* - * Form a tuple - */ - HeapTuple tuple = heap_form_tuple(tupdesc, dvalues, nulls); - - /* - * Release locally palloc'd space. XXX would probably be good to pfree - * values of pass-by-reference datums, as well. - */ - pfree(dvalues); - pfree(nulls); - - return tuple; -} - - /* * WorkerPoolFailed marks a worker pool and all the placement executions scheduled * on it as failed. @@ -5698,6 +4841,111 @@ RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList) } +/* + * CleanUpSessions does any clean-up necessary for the session used + * during the execution. We only reach the function after successfully + * completing all the tasks and we expect no tasks are still in progress. + */ +static void +CleanUpSessions(DistributedExecution *execution) +{ + List *sessionList = execution->sessionList; + + /* we get to this function only after successful executions */ + Assert(!execution->failed && execution->unfinishedTaskCount == 0); + + /* always trigger wait event set in the first round */ + WorkerSession *session = NULL; + foreach_ptr(session, sessionList) + { + MultiConnection *connection = session->connection; + + ereport(DEBUG4, (errmsg("Total number of commands sent over the session %ld: %ld " + "to node %s:%d", session->sessionId, + session->commandsSent, + connection->hostname, connection->port))); + + UnclaimConnection(connection); + + if (connection->connectionState == MULTI_CONNECTION_CONNECTING || + connection->connectionState == MULTI_CONNECTION_FAILED || + connection->connectionState == MULTI_CONNECTION_LOST || + connection->connectionState == MULTI_CONNECTION_TIMED_OUT) + { + /* + * We want the MultiConnection go away and not used in + * the subsequent executions. + * + * We cannot get MULTI_CONNECTION_LOST via the ConnectionStateMachine, + * but we might get it via the connection API and find us here before + * changing any states in the ConnectionStateMachine. + * + */ + CloseConnection(connection); + } + else if (connection->connectionState == MULTI_CONNECTION_CONNECTED) + { + RemoteTransaction *transaction = &(connection->remoteTransaction); + RemoteTransactionState transactionState = transaction->transactionState; + + if (transactionState == REMOTE_TRANS_CLEARING_RESULTS) + { + /* + * We might have established the connection, and even sent BEGIN, but not + * get to the point where we assigned a task to this specific connection + * (because other connections in the pool already finished all the tasks). + */ + Assert(session->commandsSent == 0); + + ClearResults(connection, false); + } + else if (!(transactionState == REMOTE_TRANS_NOT_STARTED || + transactionState == REMOTE_TRANS_STARTED)) + { + /* + * We don't have to handle anything else. Note that the execution + * could only finish on connectionStates of MULTI_CONNECTION_CONNECTING, + * MULTI_CONNECTION_FAILED and MULTI_CONNECTION_CONNECTED. The first two + * are already handled above. + * + * When we're on MULTI_CONNECTION_CONNECTED, TransactionStateMachine + * ensures that all the necessary commands are successfully sent over + * the connection and everything is cleared up. Otherwise, we'd have been + * on MULTI_CONNECTION_FAILED state. + */ + ereport(WARNING, (errmsg("unexpected transaction state at the end of " + "execution: %d", transactionState))); + } + + /* get ready for the next executions if we need use the same connection */ + connection->waitFlags = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE; + } + else + { + ereport(WARNING, (errmsg("unexpected connection state at the end of " + "execution: %d", connection->connectionState))); + } + } +} + + +/* + * UnclaimAllSessionConnections unclaims all of the connections for the given + * sessionList. + */ +static void +UnclaimAllSessionConnections(List *sessionList) +{ + WorkerSession *session = NULL; + foreach_ptr(session, sessionList) + { + MultiConnection *connection = session->connection; + + UnclaimConnection(connection); + } +} + + /* * SetLocalForceMaxQueryParallelization is simply a C interface for setting * the following: @@ -5710,89 +4958,3 @@ SetLocalForceMaxQueryParallelization(void) (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, GUC_ACTION_LOCAL, true, 0, false); } - - -/* - * ExtractParametersForRemoteExecution extracts parameter types and values from - * the given ParamListInfo structure, and fills parameter type and value arrays. - * It changes oid of custom types to InvalidOid so that they are the same in workers - * and coordinators. - */ -static void -ExtractParametersForRemoteExecution(ParamListInfo paramListInfo, Oid **parameterTypes, - const char ***parameterValues) -{ - ExtractParametersFromParamList(paramListInfo, parameterTypes, - parameterValues, false); -} - - -/* - * ExtractParametersFromParamList extracts parameter types and values from - * the given ParamListInfo structure, and fills parameter type and value arrays. - * If useOriginalCustomTypeOids is true, it uses the original oids for custom types. - */ -void -ExtractParametersFromParamList(ParamListInfo paramListInfo, - Oid **parameterTypes, - const char ***parameterValues, bool - useOriginalCustomTypeOids) -{ - int parameterCount = paramListInfo->numParams; - - *parameterTypes = (Oid *) palloc0(parameterCount * sizeof(Oid)); - *parameterValues = (const char **) palloc0(parameterCount * sizeof(char *)); - - /* get parameter types and values */ - for (int parameterIndex = 0; parameterIndex < parameterCount; parameterIndex++) - { - ParamExternData *parameterData = ¶mListInfo->params[parameterIndex]; - Oid typeOutputFunctionId = InvalidOid; - bool variableLengthType = false; - - /* - * Use 0 for data types where the oid values can be different on - * the coordinator and worker nodes. Therefore, the worker nodes can - * infer the correct oid. - */ - if (parameterData->ptype >= FirstNormalObjectId && !useOriginalCustomTypeOids) - { - (*parameterTypes)[parameterIndex] = 0; - } - else - { - (*parameterTypes)[parameterIndex] = parameterData->ptype; - } - - /* - * If the parameter is not referenced / used (ptype == 0) and - * would otherwise have errored out inside standard_planner()), - * don't pass a value to the remote side, and pass text oid to prevent - * undetermined data type errors on workers. - */ - if (parameterData->ptype == 0) - { - (*parameterValues)[parameterIndex] = NULL; - (*parameterTypes)[parameterIndex] = TEXTOID; - - continue; - } - - /* - * If the parameter is NULL then we preserve its type, but - * don't need to evaluate its value. - */ - if (parameterData->isnull) - { - (*parameterValues)[parameterIndex] = NULL; - - continue; - } - - getTypeOutputInfo(parameterData->ptype, &typeOutputFunctionId, - &variableLengthType); - - (*parameterValues)[parameterIndex] = OidOutputFunctionCall(typeOutputFunctionId, - parameterData->value); - } -} diff --git a/src/backend/distributed/executor/distributed_execution_locks.c b/src/backend/distributed/executor/distributed_execution_locks.c index 27c6a961d..f7d2fd49d 100644 --- a/src/backend/distributed/executor/distributed_execution_locks.c +++ b/src/backend/distributed/executor/distributed_execution_locks.c @@ -9,6 +9,7 @@ *------------------------------------------------------------------------- */ #include "distributed/distributed_execution_locks.h" +#include "distributed/executor_util.h" #include "distributed/listutils.h" #include "distributed/coordinator_protocol.h" #include "distributed/metadata_cache.h" @@ -19,6 +20,259 @@ #include "distributed/transaction_management.h" +/* + * AcquireExecutorShardLocksForExecution acquires advisory lock on shard IDs + * to prevent unsafe concurrent modifications of shards. + * + * We prevent concurrent modifications of shards in two cases: + * 1. Any non-commutative writes to a replicated table + * 2. Multi-shard writes that are executed in parallel + * + * The first case ensures we do not apply updates in different orders on + * different replicas (e.g. of a reference table), which could lead the + * replicas to diverge. + * + * The second case prevents deadlocks due to out-of-order execution. + * + * There are two GUCs that can override the default behaviors. + * 'citus.all_modifications_commutative' relaxes locking + * that's done for the purpose of keeping replicas consistent. + * 'citus.enable_deadlock_prevention' relaxes locking done for + * the purpose of avoiding deadlocks between concurrent + * multi-shard commands. + * + * We do not take executor shard locks for utility commands such as + * TRUNCATE because the table locks already prevent concurrent access. + */ +void +AcquireExecutorShardLocksForExecution(RowModifyLevel modLevel, List *taskList) +{ + if (modLevel <= ROW_MODIFY_READONLY && + !SelectForUpdateOnReferenceTable(taskList)) + { + /* + * Executor locks only apply to DML commands and SELECT FOR UPDATE queries + * touching reference tables. + */ + return; + } + + bool requiresParallelExecutionLocks = + !(list_length(taskList) == 1 || ShouldRunTasksSequentially(taskList)); + + bool modifiedTableReplicated = ModifiedTableReplicated(taskList); + if (!modifiedTableReplicated && !requiresParallelExecutionLocks) + { + /* + * When a distributed query on tables with replication + * factor == 1 and command hits only a single shard, we + * rely on Postgres to handle the serialization of the + * concurrent modifications on the workers. + * + * For reference tables, even if their placements are replicated + * ones (e.g., single node), we acquire the distributed execution + * locks to be consistent when new node(s) are added. So, they + * do not return at this point. + */ + return; + } + + /* + * We first assume that all the remaining modifications are going to + * be serialized. So, start with an ExclusiveLock and lower the lock level + * as much as possible. + */ + int lockMode = ExclusiveLock; + + /* + * In addition to honouring commutativity rules, we currently only + * allow a single multi-shard command on a shard at a time. Otherwise, + * concurrent multi-shard commands may take row-level locks on the + * shard placements in a different order and create a distributed + * deadlock. This applies even when writes are commutative and/or + * there is no replication. This can be relaxed via + * EnableDeadlockPrevention. + * + * 1. If citus.all_modifications_commutative is set to true, then all locks + * are acquired as RowExclusiveLock. + * + * 2. If citus.all_modifications_commutative is false, then only the shards + * with more than one replicas are locked with ExclusiveLock. Otherwise, the + * lock is acquired with ShareUpdateExclusiveLock. + * + * ShareUpdateExclusiveLock conflicts with itself such that only one + * multi-shard modification at a time is allowed on a shard. It also conflicts + * with ExclusiveLock, which ensures that updates/deletes/upserts are applied + * in the same order on all placements. It does not conflict with + * RowExclusiveLock, which is normally obtained by single-shard, commutative + * writes. + */ + if (!modifiedTableReplicated && requiresParallelExecutionLocks) + { + /* + * When there is no replication then we only need to prevent + * concurrent multi-shard commands on the same shards. This is + * because concurrent, parallel commands may modify the same + * set of shards, but in different orders. The order of the + * accesses might trigger distributed deadlocks that are not + * possible to happen on non-distributed systems such + * regular Postgres. + * + * As an example, assume that we have two queries: query-1 and query-2. + * Both queries access shard-1 and shard-2. If query-1 first accesses to + * shard-1 then shard-2, and query-2 accesses shard-2 then shard-1, these + * two commands might block each other in case they modify the same rows + * (e.g., cause distributed deadlocks). + * + * In either case, ShareUpdateExclusive has the desired effect, since + * it conflicts with itself and ExclusiveLock (taken by non-commutative + * writes). + * + * However, some users find this too restrictive, so we allow them to + * reduce to a RowExclusiveLock when citus.enable_deadlock_prevention + * is enabled, which lets multi-shard modifications run in parallel as + * long as they all disable the GUC. + */ + lockMode = + EnableDeadlockPrevention ? ShareUpdateExclusiveLock : RowExclusiveLock; + + if (!IsCoordinator()) + { + /* + * We also skip taking a heavy-weight lock when running a multi-shard + * commands from workers, since we currently do not prevent concurrency + * across workers anyway. + */ + lockMode = RowExclusiveLock; + } + } + else if (modifiedTableReplicated) + { + /* + * When we are executing distributed queries on replicated tables, our + * default behaviour is to prevent any concurrency. This is valid + * for when parallel execution is happening or not. + * + * The reason is that we cannot control the order of the placement accesses + * of two distributed queries to the same shards. The order of the accesses + * might cause the replicas of the same shard placements diverge. This is + * not possible to happen on non-distributed systems such regular Postgres. + * + * As an example, assume that we have two queries: query-1 and query-2. + * Both queries only access the placements of shard-1, say p-1 and p-2. + * + * And, assume that these queries are non-commutative, such as: + * query-1: UPDATE table SET b = 1 WHERE key = 1; + * query-2: UPDATE table SET b = 2 WHERE key = 1; + * + * If query-1 accesses to p-1 then p-2, and query-2 accesses + * p-2 then p-1, these two commands would leave the p-1 and p-2 + * diverged (e.g., the values for the column "b" would be different). + * + * The only exception to this rule is the single shard commutative + * modifications, such as INSERTs. In that case, we can allow + * concurrency among such backends, hence lowering the lock level + * to RowExclusiveLock. + */ + if (!requiresParallelExecutionLocks && modLevel < ROW_MODIFY_NONCOMMUTATIVE) + { + lockMode = RowExclusiveLock; + } + } + + if (AllModificationsCommutative) + { + /* + * The mapping is overridden when all_modifications_commutative is set to true. + * In that case, all modifications are treated as commutative, which can be used + * to communicate that the application is only generating commutative + * UPDATE/DELETE/UPSERT commands and exclusive locks are unnecessary. This + * is irrespective of single-shard/multi-shard or replicated tables. + */ + lockMode = RowExclusiveLock; + } + + /* now, iterate on the tasks and acquire the executor locks on the shards */ + List *anchorShardIntervalList = NIL; + List *relationRowLockList = NIL; + List *requiresConsistentSnapshotRelationShardList = NIL; + + Task *task = NULL; + foreach_ptr(task, taskList) + { + ShardInterval *anchorShardInterval = LoadShardInterval(task->anchorShardId); + anchorShardIntervalList = lappend(anchorShardIntervalList, anchorShardInterval); + + /* Acquire additional locks for SELECT .. FOR UPDATE on reference tables */ + AcquireExecutorShardLocksForRelationRowLockList(task->relationRowLockList); + + relationRowLockList = + list_concat(relationRowLockList, + task->relationRowLockList); + + /* + * If the task has a subselect, then we may need to lock the shards from which + * the query selects as well to prevent the subselects from seeing different + * results on different replicas. + */ + if (RequiresConsistentSnapshot(task)) + { + /* + * ExclusiveLock conflicts with all lock types used by modifications + * and therefore prevents other modifications from running + * concurrently. + */ + requiresConsistentSnapshotRelationShardList = + list_concat(requiresConsistentSnapshotRelationShardList, + task->relationShardList); + } + } + + /* + * Acquire the locks in a sorted way to avoid deadlocks due to lock + * ordering across concurrent sessions. + */ + anchorShardIntervalList = + SortList(anchorShardIntervalList, CompareShardIntervalsById); + + /* + * If we are dealing with a partition we are also taking locks on parent table + * to prevent deadlocks on concurrent operations on a partition and its parent. + * + * Note that this function currently does not acquire any remote locks as that + * is necessary to control the concurrency across multiple nodes for replicated + * tables. That is because Citus currently does not allow modifications to + * partitions from any node other than the coordinator. + */ + LockParentShardResourceIfPartition(anchorShardIntervalList, lockMode); + + /* Acquire distribution execution locks on the affected shards */ + SerializeNonCommutativeWrites(anchorShardIntervalList, lockMode); + + if (relationRowLockList != NIL) + { + /* Acquire additional locks for SELECT .. FOR UPDATE on reference tables */ + AcquireExecutorShardLocksForRelationRowLockList(relationRowLockList); + } + + + if (requiresConsistentSnapshotRelationShardList != NIL) + { + /* + * If the task has a subselect, then we may need to lock the shards from which + * the query selects as well to prevent the subselects from seeing different + * results on different replicas. + * + * ExclusiveLock conflicts with all lock types used by modifications + * and therefore prevents other modifications from running + * concurrently. + */ + LockRelationShardResources(requiresConsistentSnapshotRelationShardList, + ExclusiveLock); + } +} + + /* * RequiresConsistentSnapshot returns true if the given task need to take * the necessary locks to ensure that a subquery in the modify query @@ -188,3 +442,27 @@ LockPartitionRelations(Oid relationId, LOCKMODE lockMode) LockRelationOid(partitionRelationId, lockMode); } } + + +/* + * LockPartitionsForDistributedPlan ensures commands take locks on all partitions + * of a distributed table that appears in the query. We do this primarily out of + * consistency with PostgreSQL locking. + */ +void +LockPartitionsForDistributedPlan(DistributedPlan *plan) +{ + if (TaskListModifiesDatabase(plan->modLevel, plan->workerJob->taskList)) + { + Oid targetRelationId = plan->targetRelationId; + + LockPartitionsInRelationList(list_make1_oid(targetRelationId), RowExclusiveLock); + } + + /* + * Lock partitions of tables that appear in a SELECT or subquery. In the + * DML case this also includes the target relation, but since we already + * have a stronger lock this doesn't do any harm. + */ + LockPartitionsInRelationList(plan->relationIdList, AccessShareLock); +} diff --git a/src/backend/distributed/executor/executor_util_params.c b/src/backend/distributed/executor/executor_util_params.c new file mode 100644 index 000000000..6b5139bff --- /dev/null +++ b/src/backend/distributed/executor/executor_util_params.c @@ -0,0 +1,101 @@ +/*------------------------------------------------------------------------- + * + * executor_util_tasks.c + * + * Utility functions for dealing with task lists in the executor. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "funcapi.h" +#include "miscadmin.h" + +#include "distributed/executor_util.h" +#include "utils/lsyscache.h" + + +/* + * ExtractParametersForRemoteExecution extracts parameter types and values from + * the given ParamListInfo structure, and fills parameter type and value arrays. + * It changes oid of custom types to InvalidOid so that they are the same in workers + * and coordinators. + */ +void +ExtractParametersForRemoteExecution(ParamListInfo paramListInfo, Oid **parameterTypes, + const char ***parameterValues) +{ + ExtractParametersFromParamList(paramListInfo, parameterTypes, + parameterValues, false); +} + + +/* + * ExtractParametersFromParamList extracts parameter types and values from + * the given ParamListInfo structure, and fills parameter type and value arrays. + * If useOriginalCustomTypeOids is true, it uses the original oids for custom types. + */ +void +ExtractParametersFromParamList(ParamListInfo paramListInfo, + Oid **parameterTypes, + const char ***parameterValues, bool + useOriginalCustomTypeOids) +{ + int parameterCount = paramListInfo->numParams; + + *parameterTypes = (Oid *) palloc0(parameterCount * sizeof(Oid)); + *parameterValues = (const char **) palloc0(parameterCount * sizeof(char *)); + + /* get parameter types and values */ + for (int parameterIndex = 0; parameterIndex < parameterCount; parameterIndex++) + { + ParamExternData *parameterData = ¶mListInfo->params[parameterIndex]; + Oid typeOutputFunctionId = InvalidOid; + bool variableLengthType = false; + + /* + * Use 0 for data types where the oid values can be different on + * the coordinator and worker nodes. Therefore, the worker nodes can + * infer the correct oid. + */ + if (parameterData->ptype >= FirstNormalObjectId && !useOriginalCustomTypeOids) + { + (*parameterTypes)[parameterIndex] = 0; + } + else + { + (*parameterTypes)[parameterIndex] = parameterData->ptype; + } + + /* + * If the parameter is not referenced / used (ptype == 0) and + * would otherwise have errored out inside standard_planner()), + * don't pass a value to the remote side, and pass text oid to prevent + * undetermined data type errors on workers. + */ + if (parameterData->ptype == 0) + { + (*parameterValues)[parameterIndex] = NULL; + (*parameterTypes)[parameterIndex] = TEXTOID; + + continue; + } + + /* + * If the parameter is NULL then we preserve its type, but + * don't need to evaluate its value. + */ + if (parameterData->isnull) + { + (*parameterValues)[parameterIndex] = NULL; + + continue; + } + + getTypeOutputInfo(parameterData->ptype, &typeOutputFunctionId, + &variableLengthType); + + (*parameterValues)[parameterIndex] = OidOutputFunctionCall(typeOutputFunctionId, + parameterData->value); + } +} diff --git a/src/backend/distributed/executor/executor_util_tasks.c b/src/backend/distributed/executor/executor_util_tasks.c new file mode 100644 index 000000000..abf721196 --- /dev/null +++ b/src/backend/distributed/executor/executor_util_tasks.c @@ -0,0 +1,297 @@ +/*------------------------------------------------------------------------- + * + * executor_util_tasks.c + * + * Utility functions for dealing with task lists in the executor. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "funcapi.h" +#include "miscadmin.h" + +#include "distributed/executor_util.h" +#include "distributed/listutils.h" +#include "distributed/shardinterval_utils.h" + + +/* + * TaskListModifiesDatabase is a helper function for DistributedExecutionModifiesDatabase and + * DistributedPlanModifiesDatabase. + */ +bool +TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList) +{ + if (modLevel > ROW_MODIFY_READONLY) + { + return true; + } + + /* + * If we cannot decide by only checking the row modify level, + * we should look closer to the tasks. + */ + if (list_length(taskList) < 1) + { + /* is this ever possible? */ + return false; + } + + Task *firstTask = (Task *) linitial(taskList); + + return !ReadOnlyTask(firstTask->taskType); +} + + +/* + * TaskListRequiresRollback returns true if the distributed + * execution should start a CoordinatedTransaction. In other words, if the + * function returns true, the execution sends BEGIN; to every connection + * involved in the distributed execution. + */ +bool +TaskListRequiresRollback(List *taskList) +{ + int taskCount = list_length(taskList); + + if (taskCount == 0) + { + return false; + } + + Task *task = (Task *) linitial(taskList); + if (task->cannotBeExecutedInTransction) + { + /* vacuum, create index concurrently etc. */ + return false; + } + + bool selectForUpdate = task->relationRowLockList != NIL; + if (selectForUpdate) + { + /* + * Do not check SelectOpensTransactionBlock, always open transaction block + * if SELECT FOR UPDATE is executed inside a distributed transaction. + */ + return IsMultiStatementTransaction(); + } + + if (ReadOnlyTask(task->taskType)) + { + return SelectOpensTransactionBlock && + IsTransactionBlock(); + } + + if (IsMultiStatementTransaction()) + { + return true; + } + + if (list_length(taskList) > 1) + { + return true; + } + + if (list_length(task->taskPlacementList) > 1) + { + /* + * Single DML/DDL tasks with replicated tables (including + * reference and non-reference tables) should require + * BEGIN/COMMIT/ROLLBACK. + */ + return true; + } + + if (task->queryCount > 1) + { + /* + * When there are multiple sequential queries in a task + * we need to run those as a transaction. + */ + return true; + } + + return false; +} + + +/* + * TaskListRequires2PC determines whether the given task list requires 2PC. + */ +bool +TaskListRequires2PC(List *taskList) +{ + if (taskList == NIL) + { + return false; + } + + Task *task = (Task *) linitial(taskList); + if (ReadOnlyTask(task->taskType)) + { + /* we do not trigger 2PC for ReadOnly queries */ + return false; + } + + bool singleTask = list_length(taskList) == 1; + if (singleTask && list_length(task->taskPlacementList) == 1) + { + /* we do not trigger 2PC for modifications that are: + * - single task + * - single placement + */ + return false; + } + + /* + * Otherwise, all modifications are done via 2PC. This includes: + * - Multi-shard commands irrespective of the replication factor + * - Single-shard commands that are targeting more than one replica + */ + return true; +} + + +/* + * TaskListCannotBeExecutedInTransaction returns true if any of the + * tasks in the input cannot be executed in a transaction. These are + * tasks like VACUUM or CREATE INDEX CONCURRENTLY etc. + */ +bool +TaskListCannotBeExecutedInTransaction(List *taskList) +{ + Task *task = NULL; + foreach_ptr(task, taskList) + { + if (task->cannotBeExecutedInTransction) + { + return true; + } + } + + return false; +} + + +/* + * SelectForUpdateOnReferenceTable returns true if the input task + * contains a FOR UPDATE clause that locks any reference tables. + */ +bool +SelectForUpdateOnReferenceTable(List *taskList) +{ + if (list_length(taskList) != 1) + { + /* we currently do not support SELECT FOR UPDATE on multi task queries */ + return false; + } + + Task *task = (Task *) linitial(taskList); + RelationRowLock *relationRowLock = NULL; + foreach_ptr(relationRowLock, task->relationRowLockList) + { + Oid relationId = relationRowLock->relationId; + + if (IsCitusTableType(relationId, REFERENCE_TABLE)) + { + return true; + } + } + + return false; +} + + +/* + * ReadOnlyTask returns true if the input task does a read-only operation + * on the database. + */ +bool +ReadOnlyTask(TaskType taskType) +{ + switch (taskType) + { + case READ_TASK: + case MAP_OUTPUT_FETCH_TASK: + case MAP_TASK: + case MERGE_TASK: + { + return true; + } + + default: + { + return false; + } + } +} + + +/* + * ModifiedTableReplicated iterates on the task list and returns true + * if any of the tasks' anchor shard is a replicated table. We qualify + * replicated tables as any reference table or any distributed table with + * replication factor > 1. + */ +bool +ModifiedTableReplicated(List *taskList) +{ + Task *task = NULL; + foreach_ptr(task, taskList) + { + int64 shardId = task->anchorShardId; + + if (shardId == INVALID_SHARD_ID) + { + continue; + } + + if (ReferenceTableShardId(shardId)) + { + return true; + } + + Oid relationId = RelationIdForShard(shardId); + if (!SingleReplicatedTable(relationId)) + { + return true; + } + } + + return false; +} + + +/* + * ShouldRunTasksSequentially returns true if each of the individual tasks + * should be executed one by one. Note that this is different than + * MultiShardConnectionType == SEQUENTIAL_CONNECTION case. In that case, + * running the tasks across the nodes in parallel is acceptable and implemented + * in that way. + * + * However, the executions that are qualified here would perform poorly if the + * tasks across the workers are executed in parallel. We currently qualify only + * one class of distributed queries here, multi-row INSERTs. If we do not enforce + * true sequential execution, concurrent multi-row upserts could easily form + * a distributed deadlock when the upserts touch the same rows. + */ +bool +ShouldRunTasksSequentially(List *taskList) +{ + if (list_length(taskList) < 2) + { + /* single task plans are already qualified as sequential by definition */ + return false; + } + + /* all the tasks are the same, so we only look one */ + Task *initialTask = (Task *) linitial(taskList); + if (initialTask->rowValuesLists != NIL) + { + /* found a multi-row INSERT */ + return true; + } + + return false; +} diff --git a/src/backend/distributed/executor/executor_util_tuples.c b/src/backend/distributed/executor/executor_util_tuples.c new file mode 100644 index 000000000..c5fde9f90 --- /dev/null +++ b/src/backend/distributed/executor/executor_util_tuples.c @@ -0,0 +1,129 @@ +/*------------------------------------------------------------------------- + * + * executor_util_tuples.c + * + * Utility functions for handling tuples during remote execution. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "funcapi.h" +#include "miscadmin.h" + +#include "distributed/executor_util.h" +#include "utils/lsyscache.h" + + +/* + * TupleDescGetAttBinaryInMetadata - Build an AttInMetadata structure based on + * the supplied TupleDesc. AttInMetadata can be used in conjunction with + * fmStringInfos containing binary encoded types to produce a properly formed + * tuple. + * + * NOTE: This function is a copy of the PG function TupleDescGetAttInMetadata, + * except that it uses getTypeBinaryInputInfo instead of getTypeInputInfo. + */ +AttInMetadata * +TupleDescGetAttBinaryInMetadata(TupleDesc tupdesc) +{ + int natts = tupdesc->natts; + int i; + Oid atttypeid; + Oid attinfuncid; + + AttInMetadata *attinmeta = (AttInMetadata *) palloc(sizeof(AttInMetadata)); + + /* "Bless" the tupledesc so that we can make rowtype datums with it */ + attinmeta->tupdesc = BlessTupleDesc(tupdesc); + + /* + * Gather info needed later to call the "in" function for each attribute + */ + FmgrInfo *attinfuncinfo = (FmgrInfo *) palloc0(natts * sizeof(FmgrInfo)); + Oid *attioparams = (Oid *) palloc0(natts * sizeof(Oid)); + int32 *atttypmods = (int32 *) palloc0(natts * sizeof(int32)); + + for (i = 0; i < natts; i++) + { + Form_pg_attribute att = TupleDescAttr(tupdesc, i); + + /* Ignore dropped attributes */ + if (!att->attisdropped) + { + atttypeid = att->atttypid; + getTypeBinaryInputInfo(atttypeid, &attinfuncid, &attioparams[i]); + fmgr_info(attinfuncid, &attinfuncinfo[i]); + atttypmods[i] = att->atttypmod; + } + } + attinmeta->attinfuncs = attinfuncinfo; + attinmeta->attioparams = attioparams; + attinmeta->atttypmods = atttypmods; + + return attinmeta; +} + + +/* + * BuildTupleFromBytes - build a HeapTuple given user data in binary form. + * values is an array of StringInfos, one for each attribute of the return + * tuple. A NULL StringInfo pointer indicates we want to create a NULL field. + * + * NOTE: This function is a copy of the PG function BuildTupleFromCStrings, + * except that it uses ReceiveFunctionCall instead of InputFunctionCall. + */ +HeapTuple +BuildTupleFromBytes(AttInMetadata *attinmeta, fmStringInfo *values) +{ + TupleDesc tupdesc = attinmeta->tupdesc; + int natts = tupdesc->natts; + int i; + + Datum *dvalues = (Datum *) palloc(natts * sizeof(Datum)); + bool *nulls = (bool *) palloc(natts * sizeof(bool)); + + /* + * Call the "in" function for each non-dropped attribute, even for nulls, + * to support domains. + */ + for (i = 0; i < natts; i++) + { + if (!TupleDescAttr(tupdesc, i)->attisdropped) + { + /* Non-dropped attributes */ + dvalues[i] = ReceiveFunctionCall(&attinmeta->attinfuncs[i], + values[i], + attinmeta->attioparams[i], + attinmeta->atttypmods[i]); + if (values[i] != NULL) + { + nulls[i] = false; + } + else + { + nulls[i] = true; + } + } + else + { + /* Handle dropped attributes by setting to NULL */ + dvalues[i] = (Datum) 0; + nulls[i] = true; + } + } + + /* + * Form a tuple + */ + HeapTuple tuple = heap_form_tuple(tupdesc, dvalues, nulls); + + /* + * Release locally palloc'd space. XXX would probably be good to pfree + * values of pass-by-reference datums, as well. + */ + pfree(dvalues); + pfree(nulls); + + return tuple; +} diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index ffd063ca0..d946e15c8 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -90,6 +90,7 @@ #include "distributed/local_executor.h" #include "distributed/local_plan_cache.h" #include "distributed/coordinator_protocol.h" +#include "distributed/executor_util.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" #include "distributed/multi_server_executor.h" diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 98ffe1b7d..b25da1ebd 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -1670,6 +1670,48 @@ TupleToGroupShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple) } +/* + * LookupTaskPlacementHostAndPort sets the nodename and nodeport for the given task placement + * with a lookup. + */ +void +LookupTaskPlacementHostAndPort(ShardPlacement *taskPlacement, char **nodeName, + int *nodePort) +{ + if (IsDummyPlacement(taskPlacement)) + { + /* + * If we create a dummy placement for the local node, it is possible + * that the entry doesn't exist in pg_dist_node, hence a lookup will fail. + * In that case we want to use the dummy placements values. + */ + *nodeName = taskPlacement->nodeName; + *nodePort = taskPlacement->nodePort; + } + else + { + /* + * We want to lookup the node information again since it is possible that + * there were changes in pg_dist_node and we will get those invalidations + * in LookupNodeForGroup. + */ + WorkerNode *workerNode = LookupNodeForGroup(taskPlacement->groupId); + *nodeName = workerNode->workerName; + *nodePort = workerNode->workerPort; + } +} + + +/* + * IsDummyPlacement returns true if the given placement is a dummy placement. + */ +bool +IsDummyPlacement(ShardPlacement *taskPlacement) +{ + return taskPlacement->nodeId == LOCAL_NODE_ID; +} + + /* * InsertShardRow opens the shard system catalog, and inserts a new row with the * given values into that system catalog. Note that we allow the user to pass in diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 3d0fe617e..c23509df1 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -29,6 +29,7 @@ #include "distributed/citus_nodefuncs.h" #include "distributed/connection_management.h" #include "distributed/deparse_shard_query.h" +#include "distributed/executor_util.h" #include "distributed/insert_select_planner.h" #include "distributed/insert_select_executor.h" #include "distributed/listutils.h" diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 599810b5c..c3677bb1a 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -28,6 +28,7 @@ #include "distributed/deparse_shard_query.h" #include "distributed/distribution_column.h" #include "distributed/errormessage.h" +#include "distributed/executor_util.h" #include "distributed/log_utils.h" #include "distributed/insert_select_planner.h" #include "distributed/intermediate_result_pruning.h" diff --git a/src/include/distributed/adaptive_executor.h b/src/include/distributed/adaptive_executor.h index 0a4735ee7..7198858a0 100644 --- a/src/include/distributed/adaptive_executor.h +++ b/src/include/distributed/adaptive_executor.h @@ -14,7 +14,6 @@ extern int ExecutorSlowStartInterval; extern bool EnableCostBasedConnectionEstablishment; extern bool PreventIncompleteConnectionEstablishment; -extern bool ShouldRunTasksSequentially(List *taskList); extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList); extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported); extern uint64 ExecuteUtilityTaskListExtended(List *utilityTaskList, int poolSize, diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 278d7ca2d..8c2584451 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -54,6 +54,19 @@ #define WAIT_EVENT_SET_INDEX_NOT_INITIALIZED -1 #define WAIT_EVENT_SET_INDEX_FAILED -2 +/* + * UINT32_MAX is reserved in pg_dist_node, so we can use it safely. + */ +#define LOCAL_NODE_ID UINT32_MAX + +/* + * If you want to connect to the current node use `LocalHostName`, which is a GUC, instead + * of the hardcoded loopback hostname. Only if you really need the loopback hostname use + * this define. + */ +#define LOCAL_HOST_NAME "localhost" + + /* forward declare, to avoid forcing large headers on everyone */ struct pg_conn; /* target of the PGconn typedef */ struct MemoryContextData; diff --git a/src/include/distributed/distributed_execution_locks.h b/src/include/distributed/distributed_execution_locks.h index b274cd459..e789843ae 100644 --- a/src/include/distributed/distributed_execution_locks.h +++ b/src/include/distributed/distributed_execution_locks.h @@ -16,10 +16,14 @@ #include "storage/lockdefs.h" #include "distributed/multi_physical_planner.h" +extern void AcquireExecutorShardLocksForExecution(RowModifyLevel modLevel, + List *taskList); extern void AcquireExecutorShardLocksForRelationRowLockList(List *relationRowLockList); extern bool RequiresConsistentSnapshot(Task *task); extern void AcquireMetadataLocks(List *taskList); extern void LockPartitionsInRelationList(List *relationIdList, LOCKMODE lockmode); extern void LockPartitionRelations(Oid relationId, LOCKMODE lockMode); +extern void LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan); + #endif /* DISTRIBUTED_EXECUTION_LOCKS_H */ diff --git a/src/include/distributed/executor_util.h b/src/include/distributed/executor_util.h new file mode 100644 index 000000000..8560c6dfd --- /dev/null +++ b/src/include/distributed/executor_util.h @@ -0,0 +1,44 @@ +/*------------------------------------------------------------------------- + * + * executor_util.h + * Utility functions for executing task lists. + * + *------------------------------------------------------------------------- + */ +#ifndef EXECUTOR_UTIL_H +#define EXECUTOR_UTIL_H + +#include "fmgr.h" +#include "funcapi.h" + +#include "access/tupdesc.h" +#include "distributed/multi_physical_planner.h" +#include "nodes/params.h" +#include "nodes/pg_list.h" + + +/* utility functions for dealing with tasks in the executor */ +extern bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList); +extern bool TaskListRequiresRollback(List *taskList); +extern bool TaskListRequires2PC(List *taskList); +extern bool TaskListCannotBeExecutedInTransaction(List *taskList); +extern bool SelectForUpdateOnReferenceTable(List *taskList); +extern bool ReadOnlyTask(TaskType taskType); +extern bool ModifiedTableReplicated(List *taskList); +extern bool ShouldRunTasksSequentially(List *taskList); + +/* utility functions for handling parameters in the executor */ +extern void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo, + Oid **parameterTypes, + const char ***parameterValues); +extern void ExtractParametersFromParamList(ParamListInfo paramListInfo, + Oid **parameterTypes, + const char ***parameterValues, bool + useOriginalCustomTypeOids); + +/* utility functions for processing tuples in the executor */ +extern AttInMetadata * TupleDescGetAttBinaryInMetadata(TupleDesc tupdesc); +extern HeapTuple BuildTupleFromBytes(AttInMetadata *attinmeta, fmStringInfo *values); + + +#endif /* EXECUTOR_UTIL_H */ diff --git a/src/include/distributed/intermediate_result_pruning.h b/src/include/distributed/intermediate_result_pruning.h index 5880cd23c..fcd6027ef 100644 --- a/src/include/distributed/intermediate_result_pruning.h +++ b/src/include/distributed/intermediate_result_pruning.h @@ -13,18 +13,6 @@ #include "distributed/subplan_execution.h" -/* - * UINT32_MAX is reserved in pg_dist_node, so we can use it safely. - */ -#define LOCAL_NODE_ID UINT32_MAX - -/* - * If you want to connect to the current node use `LocalHostName`, which is a GUC, instead - * of the hardcoded loopback hostname. Only if you really need the loopback hostname use - * this define. - */ -#define LOCAL_HOST_NAME "localhost" - extern bool LogIntermediateResults; extern List * FindSubPlanUsages(DistributedPlan *plan); diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index c555f1f82..2248e3f58 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -14,7 +14,6 @@ #include "distributed/citus_custom_scan.h" #include "distributed/tuple_destination.h" - /* enabled with GUCs*/ extern bool EnableLocalExecution; extern bool LogLocalCommands; diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 64d2e3557..f7b2038ee 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -299,6 +299,9 @@ extern WorkerNode * ActiveShardPlacementWorkerNode(uint64 shardId); extern List * BuildShardPlacementList(int64 shardId); extern List * AllShardPlacementsOnNodeGroup(int32 groupId); extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId); +extern void LookupTaskPlacementHostAndPort(ShardPlacement *taskPlacement, char **nodeName, + int *nodePort); +extern bool IsDummyPlacement(ShardPlacement *taskPlacement); extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, SizeQueryType sizeQueryType, bool optimizePartitionCalculations); diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index b9f272d0a..4e7f13601 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -144,13 +144,6 @@ extern void SetLocalMultiShardModifyModeToSequential(void); extern void EnsureSequentialMode(ObjectType objType); extern void SetLocalForceMaxQueryParallelization(void); extern void SortTupleStore(CitusScanState *scanState); -extern bool DistributedPlanModifiesDatabase(DistributedPlan *plan); -extern bool ReadOnlyTask(TaskType taskType); -extern bool TaskListCannotBeExecutedInTransaction(List *taskList); -extern void ExtractParametersFromParamList(ParamListInfo paramListInfo, - Oid **parameterTypes, - const char ***parameterValues, bool - useOriginalCustomTypeOids); extern ParamListInfo ExecutorBoundParams(void); extern void EnsureTaskExecutionAllowed(bool isRemote);