Refactor executor utility functions into multiple files (#6593)

Co-authored-by: Marco Slot <marco.slot@gmail.com>
pull/6814/head
Marco Slot 2023-03-31 13:07:48 +02:00 committed by GitHub
parent 085b59f586
commit 343d1c5072
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1028 additions and 972 deletions

View File

@ -53,6 +53,7 @@
#include "distributed/coordinator_protocol.h"
#include "distributed/deparser.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/executor_util.h"
#include "distributed/foreign_key_relationship.h"
#include "distributed/listutils.h"
#include "distributed/local_executor.h"

File diff suppressed because it is too large Load Diff

View File

@ -9,6 +9,7 @@
*-------------------------------------------------------------------------
*/
#include "distributed/distributed_execution_locks.h"
#include "distributed/executor_util.h"
#include "distributed/listutils.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/metadata_cache.h"
@ -19,6 +20,259 @@
#include "distributed/transaction_management.h"
/*
* AcquireExecutorShardLocksForExecution acquires advisory lock on shard IDs
* to prevent unsafe concurrent modifications of shards.
*
* We prevent concurrent modifications of shards in two cases:
* 1. Any non-commutative writes to a replicated table
* 2. Multi-shard writes that are executed in parallel
*
* The first case ensures we do not apply updates in different orders on
* different replicas (e.g. of a reference table), which could lead the
* replicas to diverge.
*
* The second case prevents deadlocks due to out-of-order execution.
*
* There are two GUCs that can override the default behaviors.
* 'citus.all_modifications_commutative' relaxes locking
* that's done for the purpose of keeping replicas consistent.
* 'citus.enable_deadlock_prevention' relaxes locking done for
* the purpose of avoiding deadlocks between concurrent
* multi-shard commands.
*
* We do not take executor shard locks for utility commands such as
* TRUNCATE because the table locks already prevent concurrent access.
*/
void
AcquireExecutorShardLocksForExecution(RowModifyLevel modLevel, List *taskList)
{
if (modLevel <= ROW_MODIFY_READONLY &&
!SelectForUpdateOnReferenceTable(taskList))
{
/*
* Executor locks only apply to DML commands and SELECT FOR UPDATE queries
* touching reference tables.
*/
return;
}
bool requiresParallelExecutionLocks =
!(list_length(taskList) == 1 || ShouldRunTasksSequentially(taskList));
bool modifiedTableReplicated = ModifiedTableReplicated(taskList);
if (!modifiedTableReplicated && !requiresParallelExecutionLocks)
{
/*
* When a distributed query on tables with replication
* factor == 1 and command hits only a single shard, we
* rely on Postgres to handle the serialization of the
* concurrent modifications on the workers.
*
* For reference tables, even if their placements are replicated
* ones (e.g., single node), we acquire the distributed execution
* locks to be consistent when new node(s) are added. So, they
* do not return at this point.
*/
return;
}
/*
* We first assume that all the remaining modifications are going to
* be serialized. So, start with an ExclusiveLock and lower the lock level
* as much as possible.
*/
int lockMode = ExclusiveLock;
/*
* In addition to honouring commutativity rules, we currently only
* allow a single multi-shard command on a shard at a time. Otherwise,
* concurrent multi-shard commands may take row-level locks on the
* shard placements in a different order and create a distributed
* deadlock. This applies even when writes are commutative and/or
* there is no replication. This can be relaxed via
* EnableDeadlockPrevention.
*
* 1. If citus.all_modifications_commutative is set to true, then all locks
* are acquired as RowExclusiveLock.
*
* 2. If citus.all_modifications_commutative is false, then only the shards
* with more than one replicas are locked with ExclusiveLock. Otherwise, the
* lock is acquired with ShareUpdateExclusiveLock.
*
* ShareUpdateExclusiveLock conflicts with itself such that only one
* multi-shard modification at a time is allowed on a shard. It also conflicts
* with ExclusiveLock, which ensures that updates/deletes/upserts are applied
* in the same order on all placements. It does not conflict with
* RowExclusiveLock, which is normally obtained by single-shard, commutative
* writes.
*/
if (!modifiedTableReplicated && requiresParallelExecutionLocks)
{
/*
* When there is no replication then we only need to prevent
* concurrent multi-shard commands on the same shards. This is
* because concurrent, parallel commands may modify the same
* set of shards, but in different orders. The order of the
* accesses might trigger distributed deadlocks that are not
* possible to happen on non-distributed systems such
* regular Postgres.
*
* As an example, assume that we have two queries: query-1 and query-2.
* Both queries access shard-1 and shard-2. If query-1 first accesses to
* shard-1 then shard-2, and query-2 accesses shard-2 then shard-1, these
* two commands might block each other in case they modify the same rows
* (e.g., cause distributed deadlocks).
*
* In either case, ShareUpdateExclusive has the desired effect, since
* it conflicts with itself and ExclusiveLock (taken by non-commutative
* writes).
*
* However, some users find this too restrictive, so we allow them to
* reduce to a RowExclusiveLock when citus.enable_deadlock_prevention
* is enabled, which lets multi-shard modifications run in parallel as
* long as they all disable the GUC.
*/
lockMode =
EnableDeadlockPrevention ? ShareUpdateExclusiveLock : RowExclusiveLock;
if (!IsCoordinator())
{
/*
* We also skip taking a heavy-weight lock when running a multi-shard
* commands from workers, since we currently do not prevent concurrency
* across workers anyway.
*/
lockMode = RowExclusiveLock;
}
}
else if (modifiedTableReplicated)
{
/*
* When we are executing distributed queries on replicated tables, our
* default behaviour is to prevent any concurrency. This is valid
* for when parallel execution is happening or not.
*
* The reason is that we cannot control the order of the placement accesses
* of two distributed queries to the same shards. The order of the accesses
* might cause the replicas of the same shard placements diverge. This is
* not possible to happen on non-distributed systems such regular Postgres.
*
* As an example, assume that we have two queries: query-1 and query-2.
* Both queries only access the placements of shard-1, say p-1 and p-2.
*
* And, assume that these queries are non-commutative, such as:
* query-1: UPDATE table SET b = 1 WHERE key = 1;
* query-2: UPDATE table SET b = 2 WHERE key = 1;
*
* If query-1 accesses to p-1 then p-2, and query-2 accesses
* p-2 then p-1, these two commands would leave the p-1 and p-2
* diverged (e.g., the values for the column "b" would be different).
*
* The only exception to this rule is the single shard commutative
* modifications, such as INSERTs. In that case, we can allow
* concurrency among such backends, hence lowering the lock level
* to RowExclusiveLock.
*/
if (!requiresParallelExecutionLocks && modLevel < ROW_MODIFY_NONCOMMUTATIVE)
{
lockMode = RowExclusiveLock;
}
}
if (AllModificationsCommutative)
{
/*
* The mapping is overridden when all_modifications_commutative is set to true.
* In that case, all modifications are treated as commutative, which can be used
* to communicate that the application is only generating commutative
* UPDATE/DELETE/UPSERT commands and exclusive locks are unnecessary. This
* is irrespective of single-shard/multi-shard or replicated tables.
*/
lockMode = RowExclusiveLock;
}
/* now, iterate on the tasks and acquire the executor locks on the shards */
List *anchorShardIntervalList = NIL;
List *relationRowLockList = NIL;
List *requiresConsistentSnapshotRelationShardList = NIL;
Task *task = NULL;
foreach_ptr(task, taskList)
{
ShardInterval *anchorShardInterval = LoadShardInterval(task->anchorShardId);
anchorShardIntervalList = lappend(anchorShardIntervalList, anchorShardInterval);
/* Acquire additional locks for SELECT .. FOR UPDATE on reference tables */
AcquireExecutorShardLocksForRelationRowLockList(task->relationRowLockList);
relationRowLockList =
list_concat(relationRowLockList,
task->relationRowLockList);
/*
* If the task has a subselect, then we may need to lock the shards from which
* the query selects as well to prevent the subselects from seeing different
* results on different replicas.
*/
if (RequiresConsistentSnapshot(task))
{
/*
* ExclusiveLock conflicts with all lock types used by modifications
* and therefore prevents other modifications from running
* concurrently.
*/
requiresConsistentSnapshotRelationShardList =
list_concat(requiresConsistentSnapshotRelationShardList,
task->relationShardList);
}
}
/*
* Acquire the locks in a sorted way to avoid deadlocks due to lock
* ordering across concurrent sessions.
*/
anchorShardIntervalList =
SortList(anchorShardIntervalList, CompareShardIntervalsById);
/*
* If we are dealing with a partition we are also taking locks on parent table
* to prevent deadlocks on concurrent operations on a partition and its parent.
*
* Note that this function currently does not acquire any remote locks as that
* is necessary to control the concurrency across multiple nodes for replicated
* tables. That is because Citus currently does not allow modifications to
* partitions from any node other than the coordinator.
*/
LockParentShardResourceIfPartition(anchorShardIntervalList, lockMode);
/* Acquire distribution execution locks on the affected shards */
SerializeNonCommutativeWrites(anchorShardIntervalList, lockMode);
if (relationRowLockList != NIL)
{
/* Acquire additional locks for SELECT .. FOR UPDATE on reference tables */
AcquireExecutorShardLocksForRelationRowLockList(relationRowLockList);
}
if (requiresConsistentSnapshotRelationShardList != NIL)
{
/*
* If the task has a subselect, then we may need to lock the shards from which
* the query selects as well to prevent the subselects from seeing different
* results on different replicas.
*
* ExclusiveLock conflicts with all lock types used by modifications
* and therefore prevents other modifications from running
* concurrently.
*/
LockRelationShardResources(requiresConsistentSnapshotRelationShardList,
ExclusiveLock);
}
}
/*
* RequiresConsistentSnapshot returns true if the given task need to take
* the necessary locks to ensure that a subquery in the modify query
@ -188,3 +442,27 @@ LockPartitionRelations(Oid relationId, LOCKMODE lockMode)
LockRelationOid(partitionRelationId, lockMode);
}
}
/*
* LockPartitionsForDistributedPlan ensures commands take locks on all partitions
* of a distributed table that appears in the query. We do this primarily out of
* consistency with PostgreSQL locking.
*/
void
LockPartitionsForDistributedPlan(DistributedPlan *plan)
{
if (TaskListModifiesDatabase(plan->modLevel, plan->workerJob->taskList))
{
Oid targetRelationId = plan->targetRelationId;
LockPartitionsInRelationList(list_make1_oid(targetRelationId), RowExclusiveLock);
}
/*
* Lock partitions of tables that appear in a SELECT or subquery. In the
* DML case this also includes the target relation, but since we already
* have a stronger lock this doesn't do any harm.
*/
LockPartitionsInRelationList(plan->relationIdList, AccessShareLock);
}

View File

@ -0,0 +1,101 @@
/*-------------------------------------------------------------------------
*
* executor_util_tasks.c
*
* Utility functions for dealing with task lists in the executor.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "distributed/executor_util.h"
#include "utils/lsyscache.h"
/*
* ExtractParametersForRemoteExecution extracts parameter types and values from
* the given ParamListInfo structure, and fills parameter type and value arrays.
* It changes oid of custom types to InvalidOid so that they are the same in workers
* and coordinators.
*/
void
ExtractParametersForRemoteExecution(ParamListInfo paramListInfo, Oid **parameterTypes,
const char ***parameterValues)
{
ExtractParametersFromParamList(paramListInfo, parameterTypes,
parameterValues, false);
}
/*
* ExtractParametersFromParamList extracts parameter types and values from
* the given ParamListInfo structure, and fills parameter type and value arrays.
* If useOriginalCustomTypeOids is true, it uses the original oids for custom types.
*/
void
ExtractParametersFromParamList(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues, bool
useOriginalCustomTypeOids)
{
int parameterCount = paramListInfo->numParams;
*parameterTypes = (Oid *) palloc0(parameterCount * sizeof(Oid));
*parameterValues = (const char **) palloc0(parameterCount * sizeof(char *));
/* get parameter types and values */
for (int parameterIndex = 0; parameterIndex < parameterCount; parameterIndex++)
{
ParamExternData *parameterData = &paramListInfo->params[parameterIndex];
Oid typeOutputFunctionId = InvalidOid;
bool variableLengthType = false;
/*
* Use 0 for data types where the oid values can be different on
* the coordinator and worker nodes. Therefore, the worker nodes can
* infer the correct oid.
*/
if (parameterData->ptype >= FirstNormalObjectId && !useOriginalCustomTypeOids)
{
(*parameterTypes)[parameterIndex] = 0;
}
else
{
(*parameterTypes)[parameterIndex] = parameterData->ptype;
}
/*
* If the parameter is not referenced / used (ptype == 0) and
* would otherwise have errored out inside standard_planner()),
* don't pass a value to the remote side, and pass text oid to prevent
* undetermined data type errors on workers.
*/
if (parameterData->ptype == 0)
{
(*parameterValues)[parameterIndex] = NULL;
(*parameterTypes)[parameterIndex] = TEXTOID;
continue;
}
/*
* If the parameter is NULL then we preserve its type, but
* don't need to evaluate its value.
*/
if (parameterData->isnull)
{
(*parameterValues)[parameterIndex] = NULL;
continue;
}
getTypeOutputInfo(parameterData->ptype, &typeOutputFunctionId,
&variableLengthType);
(*parameterValues)[parameterIndex] = OidOutputFunctionCall(typeOutputFunctionId,
parameterData->value);
}
}

View File

@ -0,0 +1,297 @@
/*-------------------------------------------------------------------------
*
* executor_util_tasks.c
*
* Utility functions for dealing with task lists in the executor.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "distributed/executor_util.h"
#include "distributed/listutils.h"
#include "distributed/shardinterval_utils.h"
/*
* TaskListModifiesDatabase is a helper function for DistributedExecutionModifiesDatabase and
* DistributedPlanModifiesDatabase.
*/
bool
TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList)
{
if (modLevel > ROW_MODIFY_READONLY)
{
return true;
}
/*
* If we cannot decide by only checking the row modify level,
* we should look closer to the tasks.
*/
if (list_length(taskList) < 1)
{
/* is this ever possible? */
return false;
}
Task *firstTask = (Task *) linitial(taskList);
return !ReadOnlyTask(firstTask->taskType);
}
/*
* TaskListRequiresRollback returns true if the distributed
* execution should start a CoordinatedTransaction. In other words, if the
* function returns true, the execution sends BEGIN; to every connection
* involved in the distributed execution.
*/
bool
TaskListRequiresRollback(List *taskList)
{
int taskCount = list_length(taskList);
if (taskCount == 0)
{
return false;
}
Task *task = (Task *) linitial(taskList);
if (task->cannotBeExecutedInTransction)
{
/* vacuum, create index concurrently etc. */
return false;
}
bool selectForUpdate = task->relationRowLockList != NIL;
if (selectForUpdate)
{
/*
* Do not check SelectOpensTransactionBlock, always open transaction block
* if SELECT FOR UPDATE is executed inside a distributed transaction.
*/
return IsMultiStatementTransaction();
}
if (ReadOnlyTask(task->taskType))
{
return SelectOpensTransactionBlock &&
IsTransactionBlock();
}
if (IsMultiStatementTransaction())
{
return true;
}
if (list_length(taskList) > 1)
{
return true;
}
if (list_length(task->taskPlacementList) > 1)
{
/*
* Single DML/DDL tasks with replicated tables (including
* reference and non-reference tables) should require
* BEGIN/COMMIT/ROLLBACK.
*/
return true;
}
if (task->queryCount > 1)
{
/*
* When there are multiple sequential queries in a task
* we need to run those as a transaction.
*/
return true;
}
return false;
}
/*
* TaskListRequires2PC determines whether the given task list requires 2PC.
*/
bool
TaskListRequires2PC(List *taskList)
{
if (taskList == NIL)
{
return false;
}
Task *task = (Task *) linitial(taskList);
if (ReadOnlyTask(task->taskType))
{
/* we do not trigger 2PC for ReadOnly queries */
return false;
}
bool singleTask = list_length(taskList) == 1;
if (singleTask && list_length(task->taskPlacementList) == 1)
{
/* we do not trigger 2PC for modifications that are:
* - single task
* - single placement
*/
return false;
}
/*
* Otherwise, all modifications are done via 2PC. This includes:
* - Multi-shard commands irrespective of the replication factor
* - Single-shard commands that are targeting more than one replica
*/
return true;
}
/*
* TaskListCannotBeExecutedInTransaction returns true if any of the
* tasks in the input cannot be executed in a transaction. These are
* tasks like VACUUM or CREATE INDEX CONCURRENTLY etc.
*/
bool
TaskListCannotBeExecutedInTransaction(List *taskList)
{
Task *task = NULL;
foreach_ptr(task, taskList)
{
if (task->cannotBeExecutedInTransction)
{
return true;
}
}
return false;
}
/*
* SelectForUpdateOnReferenceTable returns true if the input task
* contains a FOR UPDATE clause that locks any reference tables.
*/
bool
SelectForUpdateOnReferenceTable(List *taskList)
{
if (list_length(taskList) != 1)
{
/* we currently do not support SELECT FOR UPDATE on multi task queries */
return false;
}
Task *task = (Task *) linitial(taskList);
RelationRowLock *relationRowLock = NULL;
foreach_ptr(relationRowLock, task->relationRowLockList)
{
Oid relationId = relationRowLock->relationId;
if (IsCitusTableType(relationId, REFERENCE_TABLE))
{
return true;
}
}
return false;
}
/*
* ReadOnlyTask returns true if the input task does a read-only operation
* on the database.
*/
bool
ReadOnlyTask(TaskType taskType)
{
switch (taskType)
{
case READ_TASK:
case MAP_OUTPUT_FETCH_TASK:
case MAP_TASK:
case MERGE_TASK:
{
return true;
}
default:
{
return false;
}
}
}
/*
* ModifiedTableReplicated iterates on the task list and returns true
* if any of the tasks' anchor shard is a replicated table. We qualify
* replicated tables as any reference table or any distributed table with
* replication factor > 1.
*/
bool
ModifiedTableReplicated(List *taskList)
{
Task *task = NULL;
foreach_ptr(task, taskList)
{
int64 shardId = task->anchorShardId;
if (shardId == INVALID_SHARD_ID)
{
continue;
}
if (ReferenceTableShardId(shardId))
{
return true;
}
Oid relationId = RelationIdForShard(shardId);
if (!SingleReplicatedTable(relationId))
{
return true;
}
}
return false;
}
/*
* ShouldRunTasksSequentially returns true if each of the individual tasks
* should be executed one by one. Note that this is different than
* MultiShardConnectionType == SEQUENTIAL_CONNECTION case. In that case,
* running the tasks across the nodes in parallel is acceptable and implemented
* in that way.
*
* However, the executions that are qualified here would perform poorly if the
* tasks across the workers are executed in parallel. We currently qualify only
* one class of distributed queries here, multi-row INSERTs. If we do not enforce
* true sequential execution, concurrent multi-row upserts could easily form
* a distributed deadlock when the upserts touch the same rows.
*/
bool
ShouldRunTasksSequentially(List *taskList)
{
if (list_length(taskList) < 2)
{
/* single task plans are already qualified as sequential by definition */
return false;
}
/* all the tasks are the same, so we only look one */
Task *initialTask = (Task *) linitial(taskList);
if (initialTask->rowValuesLists != NIL)
{
/* found a multi-row INSERT */
return true;
}
return false;
}

View File

@ -0,0 +1,129 @@
/*-------------------------------------------------------------------------
*
* executor_util_tuples.c
*
* Utility functions for handling tuples during remote execution.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "distributed/executor_util.h"
#include "utils/lsyscache.h"
/*
* TupleDescGetAttBinaryInMetadata - Build an AttInMetadata structure based on
* the supplied TupleDesc. AttInMetadata can be used in conjunction with
* fmStringInfos containing binary encoded types to produce a properly formed
* tuple.
*
* NOTE: This function is a copy of the PG function TupleDescGetAttInMetadata,
* except that it uses getTypeBinaryInputInfo instead of getTypeInputInfo.
*/
AttInMetadata *
TupleDescGetAttBinaryInMetadata(TupleDesc tupdesc)
{
int natts = tupdesc->natts;
int i;
Oid atttypeid;
Oid attinfuncid;
AttInMetadata *attinmeta = (AttInMetadata *) palloc(sizeof(AttInMetadata));
/* "Bless" the tupledesc so that we can make rowtype datums with it */
attinmeta->tupdesc = BlessTupleDesc(tupdesc);
/*
* Gather info needed later to call the "in" function for each attribute
*/
FmgrInfo *attinfuncinfo = (FmgrInfo *) palloc0(natts * sizeof(FmgrInfo));
Oid *attioparams = (Oid *) palloc0(natts * sizeof(Oid));
int32 *atttypmods = (int32 *) palloc0(natts * sizeof(int32));
for (i = 0; i < natts; i++)
{
Form_pg_attribute att = TupleDescAttr(tupdesc, i);
/* Ignore dropped attributes */
if (!att->attisdropped)
{
atttypeid = att->atttypid;
getTypeBinaryInputInfo(atttypeid, &attinfuncid, &attioparams[i]);
fmgr_info(attinfuncid, &attinfuncinfo[i]);
atttypmods[i] = att->atttypmod;
}
}
attinmeta->attinfuncs = attinfuncinfo;
attinmeta->attioparams = attioparams;
attinmeta->atttypmods = atttypmods;
return attinmeta;
}
/*
* BuildTupleFromBytes - build a HeapTuple given user data in binary form.
* values is an array of StringInfos, one for each attribute of the return
* tuple. A NULL StringInfo pointer indicates we want to create a NULL field.
*
* NOTE: This function is a copy of the PG function BuildTupleFromCStrings,
* except that it uses ReceiveFunctionCall instead of InputFunctionCall.
*/
HeapTuple
BuildTupleFromBytes(AttInMetadata *attinmeta, fmStringInfo *values)
{
TupleDesc tupdesc = attinmeta->tupdesc;
int natts = tupdesc->natts;
int i;
Datum *dvalues = (Datum *) palloc(natts * sizeof(Datum));
bool *nulls = (bool *) palloc(natts * sizeof(bool));
/*
* Call the "in" function for each non-dropped attribute, even for nulls,
* to support domains.
*/
for (i = 0; i < natts; i++)
{
if (!TupleDescAttr(tupdesc, i)->attisdropped)
{
/* Non-dropped attributes */
dvalues[i] = ReceiveFunctionCall(&attinmeta->attinfuncs[i],
values[i],
attinmeta->attioparams[i],
attinmeta->atttypmods[i]);
if (values[i] != NULL)
{
nulls[i] = false;
}
else
{
nulls[i] = true;
}
}
else
{
/* Handle dropped attributes by setting to NULL */
dvalues[i] = (Datum) 0;
nulls[i] = true;
}
}
/*
* Form a tuple
*/
HeapTuple tuple = heap_form_tuple(tupdesc, dvalues, nulls);
/*
* Release locally palloc'd space. XXX would probably be good to pfree
* values of pass-by-reference datums, as well.
*/
pfree(dvalues);
pfree(nulls);
return tuple;
}

View File

@ -90,6 +90,7 @@
#include "distributed/local_executor.h"
#include "distributed/local_plan_cache.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/executor_util.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_server_executor.h"

View File

@ -1670,6 +1670,48 @@ TupleToGroupShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple)
}
/*
* LookupTaskPlacementHostAndPort sets the nodename and nodeport for the given task placement
* with a lookup.
*/
void
LookupTaskPlacementHostAndPort(ShardPlacement *taskPlacement, char **nodeName,
int *nodePort)
{
if (IsDummyPlacement(taskPlacement))
{
/*
* If we create a dummy placement for the local node, it is possible
* that the entry doesn't exist in pg_dist_node, hence a lookup will fail.
* In that case we want to use the dummy placements values.
*/
*nodeName = taskPlacement->nodeName;
*nodePort = taskPlacement->nodePort;
}
else
{
/*
* We want to lookup the node information again since it is possible that
* there were changes in pg_dist_node and we will get those invalidations
* in LookupNodeForGroup.
*/
WorkerNode *workerNode = LookupNodeForGroup(taskPlacement->groupId);
*nodeName = workerNode->workerName;
*nodePort = workerNode->workerPort;
}
}
/*
* IsDummyPlacement returns true if the given placement is a dummy placement.
*/
bool
IsDummyPlacement(ShardPlacement *taskPlacement)
{
return taskPlacement->nodeId == LOCAL_NODE_ID;
}
/*
* InsertShardRow opens the shard system catalog, and inserts a new row with the
* given values into that system catalog. Note that we allow the user to pass in

View File

@ -29,6 +29,7 @@
#include "distributed/citus_nodefuncs.h"
#include "distributed/connection_management.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/executor_util.h"
#include "distributed/insert_select_planner.h"
#include "distributed/insert_select_executor.h"
#include "distributed/listutils.h"

View File

@ -28,6 +28,7 @@
#include "distributed/deparse_shard_query.h"
#include "distributed/distribution_column.h"
#include "distributed/errormessage.h"
#include "distributed/executor_util.h"
#include "distributed/log_utils.h"
#include "distributed/insert_select_planner.h"
#include "distributed/intermediate_result_pruning.h"

View File

@ -14,7 +14,6 @@ extern int ExecutorSlowStartInterval;
extern bool EnableCostBasedConnectionEstablishment;
extern bool PreventIncompleteConnectionEstablishment;
extern bool ShouldRunTasksSequentially(List *taskList);
extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList);
extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported);
extern uint64 ExecuteUtilityTaskListExtended(List *utilityTaskList, int poolSize,

View File

@ -54,6 +54,19 @@
#define WAIT_EVENT_SET_INDEX_NOT_INITIALIZED -1
#define WAIT_EVENT_SET_INDEX_FAILED -2
/*
* UINT32_MAX is reserved in pg_dist_node, so we can use it safely.
*/
#define LOCAL_NODE_ID UINT32_MAX
/*
* If you want to connect to the current node use `LocalHostName`, which is a GUC, instead
* of the hardcoded loopback hostname. Only if you really need the loopback hostname use
* this define.
*/
#define LOCAL_HOST_NAME "localhost"
/* forward declare, to avoid forcing large headers on everyone */
struct pg_conn; /* target of the PGconn typedef */
struct MemoryContextData;

View File

@ -16,10 +16,14 @@
#include "storage/lockdefs.h"
#include "distributed/multi_physical_planner.h"
extern void AcquireExecutorShardLocksForExecution(RowModifyLevel modLevel,
List *taskList);
extern void AcquireExecutorShardLocksForRelationRowLockList(List *relationRowLockList);
extern bool RequiresConsistentSnapshot(Task *task);
extern void AcquireMetadataLocks(List *taskList);
extern void LockPartitionsInRelationList(List *relationIdList, LOCKMODE lockmode);
extern void LockPartitionRelations(Oid relationId, LOCKMODE lockMode);
extern void LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan);
#endif /* DISTRIBUTED_EXECUTION_LOCKS_H */

View File

@ -0,0 +1,44 @@
/*-------------------------------------------------------------------------
*
* executor_util.h
* Utility functions for executing task lists.
*
*-------------------------------------------------------------------------
*/
#ifndef EXECUTOR_UTIL_H
#define EXECUTOR_UTIL_H
#include "fmgr.h"
#include "funcapi.h"
#include "access/tupdesc.h"
#include "distributed/multi_physical_planner.h"
#include "nodes/params.h"
#include "nodes/pg_list.h"
/* utility functions for dealing with tasks in the executor */
extern bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList);
extern bool TaskListRequiresRollback(List *taskList);
extern bool TaskListRequires2PC(List *taskList);
extern bool TaskListCannotBeExecutedInTransaction(List *taskList);
extern bool SelectForUpdateOnReferenceTable(List *taskList);
extern bool ReadOnlyTask(TaskType taskType);
extern bool ModifiedTableReplicated(List *taskList);
extern bool ShouldRunTasksSequentially(List *taskList);
/* utility functions for handling parameters in the executor */
extern void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues);
extern void ExtractParametersFromParamList(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues, bool
useOriginalCustomTypeOids);
/* utility functions for processing tuples in the executor */
extern AttInMetadata * TupleDescGetAttBinaryInMetadata(TupleDesc tupdesc);
extern HeapTuple BuildTupleFromBytes(AttInMetadata *attinmeta, fmStringInfo *values);
#endif /* EXECUTOR_UTIL_H */

View File

@ -13,18 +13,6 @@
#include "distributed/subplan_execution.h"
/*
* UINT32_MAX is reserved in pg_dist_node, so we can use it safely.
*/
#define LOCAL_NODE_ID UINT32_MAX
/*
* If you want to connect to the current node use `LocalHostName`, which is a GUC, instead
* of the hardcoded loopback hostname. Only if you really need the loopback hostname use
* this define.
*/
#define LOCAL_HOST_NAME "localhost"
extern bool LogIntermediateResults;
extern List * FindSubPlanUsages(DistributedPlan *plan);

View File

@ -14,7 +14,6 @@
#include "distributed/citus_custom_scan.h"
#include "distributed/tuple_destination.h"
/* enabled with GUCs*/
extern bool EnableLocalExecution;
extern bool LogLocalCommands;

View File

@ -299,6 +299,9 @@ extern WorkerNode * ActiveShardPlacementWorkerNode(uint64 shardId);
extern List * BuildShardPlacementList(int64 shardId);
extern List * AllShardPlacementsOnNodeGroup(int32 groupId);
extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId);
extern void LookupTaskPlacementHostAndPort(ShardPlacement *taskPlacement, char **nodeName,
int *nodePort);
extern bool IsDummyPlacement(ShardPlacement *taskPlacement);
extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
SizeQueryType sizeQueryType,
bool optimizePartitionCalculations);

View File

@ -144,13 +144,6 @@ extern void SetLocalMultiShardModifyModeToSequential(void);
extern void EnsureSequentialMode(ObjectType objType);
extern void SetLocalForceMaxQueryParallelization(void);
extern void SortTupleStore(CitusScanState *scanState);
extern bool DistributedPlanModifiesDatabase(DistributedPlan *plan);
extern bool ReadOnlyTask(TaskType taskType);
extern bool TaskListCannotBeExecutedInTransaction(List *taskList);
extern void ExtractParametersFromParamList(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues, bool
useOriginalCustomTypeOids);
extern ParamListInfo ExecutorBoundParams(void);
extern void EnsureTaskExecutionAllowed(bool isRemote);