Remove real time router executors (#3142)

* Remove unused executor codes

All of the codes of real-time executor. Some functions
in router executor still remains there because there
are common functions. We'll move them to accurate places
in the follow-up commits.

* Move GUCs to transaction mngnt and remove unused struct

* Update test output

* Get rid of references of real-time executor from code

* Warn if real-time executor is picked

* Remove lots of unused connection codes

* Removed unused code for connection restrictions

Real-time and router executors cannot handle re-using of the existing
connections within a transaction block.

Adaptive executor and COPY can re-use the connections. So, there is no
reason to keep the code around for applying the restrictions in the
placement connection logic.
pull/3158/head
Önder Kalacı 2019-11-05 12:48:10 +01:00 committed by GitHub
parent f0c35ad134
commit 960cd02c67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
58 changed files with 1217 additions and 4831 deletions

View File

@ -77,7 +77,6 @@
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_planner.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/multi_executor.h"
#include "distributed/placement_connection.h"
#include "distributed/relation_access_tracking.h"
@ -185,6 +184,15 @@ struct CopyShardState
List *placementStateList;
};
/* ShardConnections represents a set of connections for each placement of a shard */
typedef struct ShardConnections
{
int64 shardId;
/* list of MultiConnection structs */
List *connectionList;
} ShardConnections;
/* Local functions forward declarations */
static void CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag);

View File

@ -47,7 +47,7 @@
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_router_executor.h"
#include "distributed/multi_executor.h"
#include "distributed/resource_lock.h"
#include "distributed/transmit.h"
#include "distributed/version_compat.h"
@ -77,8 +77,6 @@ static List * PlanAlterOwnerStmt(AlterOwnerStmt *stmt, const char *queryString);
static List * PlanAlterObjectDependsStmt(AlterObjectDependsStmt *stmt,
const char *queryString);
static void ExecuteNodeBaseDDLCommands(List *taskList);
/*
* CitusProcessUtility is a convenience method to create a PlannedStmt out of pieces of a
@ -907,19 +905,8 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
EnsurePartitionTableNotReplicated(ddlJob->targetRelationId);
}
if (TaskExecutorType != MULTI_EXECUTOR_ADAPTIVE &&
ddlJob->targetRelationId == InvalidOid)
{
/*
* Some ddl jobs can only be run by the adaptive executor and not our legacy ones.
*
* These are tasks that are not pinned to any relation nor shards. We can execute
* these very naively with a simple for loop that sends them to the target worker.
*/
ExecuteNodeBaseDDLCommands(ddlJob->taskList);
}
else if (!ddlJob->concurrentIndexCmd)
if (!ddlJob->concurrentIndexCmd)
{
if (shouldSyncMetadata)
{
@ -987,34 +974,6 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
}
/*
* ExecuteNodeBaseDDLCommands executes ddl commands naively only when we are not using the
* adaptive executor. It gets connections to the target placements and executes the
* commands.
*/
static void
ExecuteNodeBaseDDLCommands(List *taskList)
{
ListCell *taskCell = NULL;
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
ListCell *taskPlacementCell = NULL;
/* these tasks should not be pinned to any shard */
Assert(task->anchorShardId == INVALID_SHARD_ID);
foreach(taskPlacementCell, task->taskPlacementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(taskPlacementCell);
SendCommandToWorkerAsUser(placement->nodeName, placement->nodePort, NULL,
task->queryString);
}
}
}
/*
* SetSearchPathToCurrentSearchPathCommand generates a command which can
* set the search path to the exact same search path that the issueing node

View File

@ -17,7 +17,7 @@
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_router_executor.h"
#include "distributed/multi_executor.h"
#include "distributed/resource_lock.h"
#include "distributed/transaction_management.h"
#include "distributed/version_compat.h"

View File

@ -15,7 +15,6 @@
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_router_executor.h"
#include "distributed/resource_lock.h"
#include "distributed/transaction_management.h"
#include "distributed/version_compat.h"

View File

@ -1144,17 +1144,6 @@ SetCitusNoticeProcessor(MultiConnection *connection)
}
/*
* SetCitusNoticeLevel is used to set the notice level for distributed
* queries.
*/
void
SetCitusNoticeLevel(int level)
{
CitusNoticeLogLevel = level;
}
/*
* UnsetCitusNoticeLevel sets the CitusNoticeLogLevel back to
* its default value.

View File

@ -71,7 +71,7 @@ struct ColocatedPlacementsHashEntry;
*
* This stores a list of connections for each placement, because multiple
* connections to the same placement may exist at the same time. E.g. a
* real-time executor query may reference the same placement in several
* adaptive executor query may reference the same placement in several
* sub-tasks.
*
* We keep track about a connection having executed DML or DDL, since we can
@ -256,23 +256,6 @@ StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *us
}
/*
* GetPlacementListConnection establishes a connection for a set of placement
* accesses.
*
* See StartPlacementListConnection for details.
*/
MultiConnection *
GetPlacementListConnection(uint32 flags, List *placementAccessList, const char *userName)
{
MultiConnection *connection = StartPlacementListConnection(flags, placementAccessList,
userName);
FinishConnectionEstablishment(connection);
return connection;
}
/*
* StartPlacementListConnection returns a connection to a remote node suitable for
* a placement accesses (SELECT, DML, DDL) or throws an error if no suitable
@ -629,80 +612,6 @@ FindPlacementListConnection(int flags, List *placementAccessList, const char *us
foundModifyingConnection = true;
}
}
else if (placementConnection->hadDDL)
{
/*
* There is an existing connection, but we cannot use it and it executed
* DDL. Any subsequent operation needs to be able to see the results of
* the DDL command and thus cannot proceed if it cannot use the connection.
*/
Assert(placementConnection != NULL);
Assert(!CanUseExistingConnection(flags, userName, placementConnection));
ereport(ERROR,
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot establish a new connection for "
"placement " UINT64_FORMAT
", since DDL has been executed on a connection that is in use",
placement->placementId)));
}
else if (placementConnection->hadDML)
{
/*
* There is an existing connection, but we cannot use it and it executed
* DML. Any subsequent operation needs to be able to see the results of
* the DML command and thus cannot proceed if it cannot use the connection.
*
* Note that this is not meaningfully different from the previous case. We
* just produce a different error message based on whether DDL was or only
* DML was executed.
*/
Assert(placementConnection != NULL);
Assert(!CanUseExistingConnection(flags, userName, placementConnection));
Assert(!placementConnection->hadDDL);
ereport(ERROR,
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot establish a new connection for "
"placement " UINT64_FORMAT
", since DML has been executed on a connection that is in use",
placement->placementId)));
}
else if (accessType == PLACEMENT_ACCESS_DDL)
{
/*
* There is an existing connection, but we cannot use it and we want to
* execute DDL. The operation on the existing connection might conflict
* with the DDL statement.
*/
Assert(placementConnection != NULL);
Assert(!CanUseExistingConnection(flags, userName, placementConnection));
Assert(!placementConnection->hadDDL);
Assert(!placementConnection->hadDML);
ereport(ERROR,
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot perform a parallel DDL command because multiple "
"placements have been accessed over the same connection")));
}
else
{
/*
* The placement has a connection assigned to it, but it cannot be used,
* most likely because it has been claimed exclusively. Fortunately, it
* has only been used for reads and we're not performing a DDL command.
* We can therefore use a different connection for this placement.
*/
Assert(placementConnection != NULL);
Assert(!CanUseExistingConnection(flags, userName, placementConnection));
Assert(!placementConnection->hadDDL);
Assert(!placementConnection->hadDML);
Assert(accessType != PLACEMENT_ACCESS_DDL);
}
}
return chosenConnection;

View File

@ -127,17 +127,20 @@
#include <sys/stat.h>
#include <unistd.h>
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "distributed/citus_custom_scan.h"
#include "distributed/connection_management.h"
#include "distributed/distributed_execution_locks.h"
#include "distributed/local_executor.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_resowner.h"
#include "distributed/multi_router_executor.h"
#include "distributed/multi_server_executor.h"
#include "distributed/placement_access.h"
#include "distributed/placement_connection.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
@ -150,6 +153,7 @@
#include "storage/fd.h"
#include "storage/latch.h"
#include "utils/int8.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/timestamp.h"
@ -553,6 +557,8 @@ static void AdjustDistributedExecutionAfterLocalExecution(DistributedExecution *
static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution);
static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList);
static bool DistributedExecutionRequiresRollback(DistributedExecution *execution);
static bool TaskListRequires2PC(List *taskList);
static bool ReadOnlyTask(TaskType taskType);
static bool SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList);
static void AssignTasksToConnections(DistributedExecution *execution);
static void UnclaimAllSessionConnections(List *sessionList);
@ -574,7 +580,6 @@ static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *ses
static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *workerPool);
static bool StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
WorkerSession *session);
static List * PlacementAccessListForTask(Task *task, ShardPlacement *taskPlacement);
static void ConnectionStateMachine(WorkerSession *session);
static void Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session);
static bool TransactionModifiedDistributedTable(DistributedExecution *execution);
@ -592,6 +597,9 @@ static bool ShouldMarkPlacementsInvalidOnFailure(DistributedExecution *execution
static void PlacementExecutionReady(TaskPlacementExecution *placementExecution);
static TaskExecutionState TaskExecutionStateMachine(ShardCommandExecution *
shardCommandExecution);
static void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues);
/*
@ -746,28 +754,13 @@ AdjustDistributedExecutionAfterLocalExecution(DistributedExecution *execution)
/*
* ExecuteUtilityTaskListWithoutResults is a wrapper around executing task
* list for utility commands. If the adaptive executor is enabled, the function
* executes the task list via the adaptive executor. Else, the function goes
* through router executor.
* list for utility commands. It simply calls in adaptive executor's task
* execution function.
*/
void
ExecuteUtilityTaskListWithoutResults(List *taskList)
{
if (TaskExecutorType == MULTI_EXECUTOR_ADAPTIVE)
{
ExecuteTaskList(ROW_MODIFY_NONE, taskList, MaxAdaptiveExecutorPoolSize);
}
else
{
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{
ExecuteModifyTasksSequentiallyWithoutResults(taskList, ROW_MODIFY_NONE);
}
else
{
ExecuteModifyTasksWithoutResults(taskList);
}
}
ExecuteTaskList(ROW_MODIFY_NONE, taskList, MaxAdaptiveExecutorPoolSize);
}
@ -1117,6 +1110,86 @@ DistributedExecutionRequiresRollback(DistributedExecution *execution)
}
/*
* TaskListRequires2PC determines whether the given task list requires 2PC
* because the tasks provided operates on a reference table or there are multiple
* tasks and the commit protocol is 2PC.
*
* Note that we currently do not generate tasks lists that involves multiple different
* tables, thus we only check the first task in the list for reference tables.
*/
static bool
TaskListRequires2PC(List *taskList)
{
Task *task = NULL;
bool multipleTasks = false;
uint64 anchorShardId = INVALID_SHARD_ID;
if (taskList == NIL)
{
return false;
}
task = (Task *) linitial(taskList);
if (task->replicationModel == REPLICATION_MODEL_2PC)
{
return true;
}
/*
* Some tasks don't set replicationModel thus we rely on
* the anchorShardId as well replicationModel.
*
* TODO: Do we ever need replicationModel in the Task structure?
* Can't we always rely on anchorShardId?
*/
anchorShardId = task->anchorShardId;
if (anchorShardId != INVALID_SHARD_ID && ReferenceTableShardId(anchorShardId))
{
return true;
}
multipleTasks = list_length(taskList) > 1;
if (!ReadOnlyTask(task->taskType) &&
multipleTasks && MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC)
{
return true;
}
if (task->taskType == DDL_TASK)
{
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC ||
task->replicationModel == REPLICATION_MODEL_2PC)
{
return true;
}
}
return false;
}
/*
* ReadOnlyTask returns true if the input task does a read-only operation
* on the database.
*/
static bool
ReadOnlyTask(TaskType taskType)
{
if (taskType == ROUTER_TASK || taskType == SQL_TASK)
{
/*
* TODO: We currently do not execute modifying CTEs via ROUTER_TASK/SQL_TASK.
* When we implement it, we should either not use the mentioned task types for
* modifying CTEs detect them here.
*/
return true;
}
return false;
}
/*
* SelectForUpdateOnReferenceTable returns true if the input task
* that contains FOR UPDATE clause that locks any reference tables.
@ -2951,80 +3024,6 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
}
/*
* PlacementAccessListForTask returns a list of placement accesses for a given
* task and task placement.
*/
static List *
PlacementAccessListForTask(Task *task, ShardPlacement *taskPlacement)
{
List *placementAccessList = NIL;
List *relationShardList = task->relationShardList;
bool addAnchorAccess = false;
ShardPlacementAccessType accessType = PLACEMENT_ACCESS_SELECT;
if (task->taskType == MODIFY_TASK)
{
/* DML command */
addAnchorAccess = true;
accessType = PLACEMENT_ACCESS_DML;
}
else if (task->taskType == DDL_TASK || task->taskType == VACUUM_ANALYZE_TASK)
{
/* DDL command */
addAnchorAccess = true;
accessType = PLACEMENT_ACCESS_DDL;
}
else if (relationShardList == NIL)
{
/* SELECT query that does not touch any shard placements */
addAnchorAccess = true;
accessType = PLACEMENT_ACCESS_SELECT;
}
if (addAnchorAccess)
{
ShardPlacementAccess *placementAccess =
CreatePlacementAccess(taskPlacement, accessType);
placementAccessList = lappend(placementAccessList, placementAccess);
}
/*
* We've already added anchor shardId's placement access to the list. Now,
* add the other placements in the relationShardList.
*/
if (accessType == PLACEMENT_ACCESS_DDL)
{
/*
* All relations appearing inter-shard DDL commands should be marked
* with DDL access.
*/
List *relationShardAccessList =
BuildPlacementDDLList(taskPlacement->groupId, relationShardList);
placementAccessList = list_concat(placementAccessList, relationShardAccessList);
}
else
{
/*
* In case of SELECTs or DML's, we add SELECT placement accesses to the
* elements in relationShardList. For SELECT queries, it is trivial, since
* the query is literally accesses the relationShardList in the same query.
*
* For DMLs, create placement accesses for placements that appear in a
* subselect.
*/
List *relationShardAccessList =
BuildPlacementSelectList(taskPlacement->groupId, relationShardList);
placementAccessList = list_concat(placementAccessList, relationShardAccessList);
}
return placementAccessList;
}
/*
* ReceiveResults reads the result of a command or query and writes returned
* rows to the tuple store of the scan state. It returns whether fetching results
@ -3703,3 +3702,90 @@ SetLocalForceMaxQueryParallelization(void)
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
GUC_ACTION_LOCAL, true, 0, false);
}
/*
* ExtractParametersForRemoteExecution extracts parameter types and values from
* the given ParamListInfo structure, and fills parameter type and value arrays.
* It changes oid of custom types to InvalidOid so that they are the same in workers
* and coordinators.
*/
static void
ExtractParametersForRemoteExecution(ParamListInfo paramListInfo, Oid **parameterTypes,
const char ***parameterValues)
{
ExtractParametersFromParamList(paramListInfo, parameterTypes,
parameterValues, false);
}
/*
* ExtractParametersFromParamList extracts parameter types and values from
* the given ParamListInfo structure, and fills parameter type and value arrays.
* If useOriginalCustomTypeOids is true, it uses the original oids for custom types.
*/
void
ExtractParametersFromParamList(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues, bool
useOriginalCustomTypeOids)
{
int parameterIndex = 0;
int parameterCount = paramListInfo->numParams;
*parameterTypes = (Oid *) palloc0(parameterCount * sizeof(Oid));
*parameterValues = (const char **) palloc0(parameterCount * sizeof(char *));
/* get parameter types and values */
for (parameterIndex = 0; parameterIndex < parameterCount; parameterIndex++)
{
ParamExternData *parameterData = &paramListInfo->params[parameterIndex];
Oid typeOutputFunctionId = InvalidOid;
bool variableLengthType = false;
/*
* Use 0 for data types where the oid values can be different on
* the master and worker nodes. Therefore, the worker nodes can
* infer the correct oid.
*/
if (parameterData->ptype >= FirstNormalObjectId && !useOriginalCustomTypeOids)
{
(*parameterTypes)[parameterIndex] = 0;
}
else
{
(*parameterTypes)[parameterIndex] = parameterData->ptype;
}
/*
* If the parameter is not referenced / used (ptype == 0) and
* would otherwise have errored out inside standard_planner()),
* don't pass a value to the remote side, and pass text oid to prevent
* undetermined data type errors on workers.
*/
if (parameterData->ptype == 0)
{
(*parameterValues)[parameterIndex] = NULL;
(*parameterTypes)[parameterIndex] = TEXTOID;
continue;
}
/*
* If the parameter is NULL then we preserve its type, but
* don't need to evaluate its value.
*/
if (parameterData->isnull)
{
(*parameterValues)[parameterIndex] = NULL;
continue;
}
getTypeOutputInfo(parameterData->ptype, &typeOutputFunctionId,
&variableLengthType);
(*parameterValues)[parameterIndex] = OidOutputFunctionCall(typeOutputFunctionId,
parameterData->value);
}
}

View File

@ -13,11 +13,14 @@
#include "commands/copy.h"
#include "distributed/backend_data.h"
#include "distributed/citus_clauses.h"
#include "distributed/citus_custom_scan.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/distributed_execution_locks.h"
#include "distributed/insert_select_executor.h"
#include "distributed/insert_select_planner.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_server_executor.h"
#include "distributed/multi_router_executor.h"
#include "distributed/multi_router_planner.h"
#include "distributed/query_stats.h"
#include "distributed/subplan_execution.h"
@ -30,14 +33,13 @@
/* functions for creating custom scan nodes */
static Node * AdaptiveExecutorCreateScan(CustomScan *scan);
static Node * RealTimeCreateScan(CustomScan *scan);
static Node * TaskTrackerCreateScan(CustomScan *scan);
static Node * RouterCreateScan(CustomScan *scan);
static Node * CoordinatorInsertSelectCreateScan(CustomScan *scan);
static Node * DelayedErrorCreateScan(CustomScan *scan);
/* functions that are common to different scans */
static void CitusBeginScan(CustomScanState *node, EState *estate, int eflags);
static void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags);
static void CitusEndScan(CustomScanState *node);
static void CitusReScan(CustomScanState *node);
@ -48,21 +50,11 @@ CustomScanMethods AdaptiveExecutorCustomScanMethods = {
AdaptiveExecutorCreateScan
};
CustomScanMethods RealTimeCustomScanMethods = {
"Citus Real-Time",
RealTimeCreateScan
};
CustomScanMethods TaskTrackerCustomScanMethods = {
"Citus Task-Tracker",
TaskTrackerCreateScan
};
CustomScanMethods RouterCustomScanMethods = {
"Citus Router",
RouterCreateScan
};
CustomScanMethods CoordinatorInsertSelectCustomScanMethods = {
"Citus INSERT ... SELECT via coordinator",
CoordinatorInsertSelectCreateScan
@ -86,15 +78,6 @@ static CustomExecMethods AdaptiveExecutorCustomExecMethods = {
.ExplainCustomScan = CitusExplainScan
};
static CustomExecMethods RealTimeCustomExecMethods = {
.CustomName = "RealTimeScan",
.BeginCustomScan = CitusBeginScan,
.ExecCustomScan = RealTimeExecScan,
.EndCustomScan = CitusEndScan,
.ReScanCustomScan = CitusReScan,
.ExplainCustomScan = CitusExplainScan
};
static CustomExecMethods TaskTrackerCustomExecMethods = {
.CustomName = "TaskTrackerScan",
.BeginCustomScan = CitusBeginScan,
@ -104,24 +87,6 @@ static CustomExecMethods TaskTrackerCustomExecMethods = {
.ExplainCustomScan = CitusExplainScan
};
static CustomExecMethods RouterModifyCustomExecMethods = {
.CustomName = "RouterModifyScan",
.BeginCustomScan = CitusBeginScan,
.ExecCustomScan = RouterModifyExecScan,
.EndCustomScan = CitusEndScan,
.ReScanCustomScan = CitusReScan,
.ExplainCustomScan = CitusExplainScan
};
static CustomExecMethods RouterSelectCustomExecMethods = {
.CustomName = "RouterSelectScan",
.BeginCustomScan = CitusBeginScan,
.ExecCustomScan = RouterSelectExecScan,
.EndCustomScan = CitusEndScan,
.ReScanCustomScan = CitusReScan,
.ExplainCustomScan = CitusExplainScan
};
static CustomExecMethods CoordinatorInsertSelectCustomExecMethods = {
.CustomName = "CoordinatorInsertSelectScan",
.BeginCustomScan = CitusBeginScan,
@ -139,9 +104,7 @@ void
RegisterCitusCustomScanMethods(void)
{
RegisterCustomScanMethods(&AdaptiveExecutorCustomScanMethods);
RegisterCustomScanMethods(&RealTimeCustomScanMethods);
RegisterCustomScanMethods(&TaskTrackerCustomScanMethods);
RegisterCustomScanMethods(&RouterCustomScanMethods);
RegisterCustomScanMethods(&CoordinatorInsertSelectCustomScanMethods);
RegisterCustomScanMethods(&DelayedErrorCustomScanMethods);
}
@ -204,6 +167,82 @@ CitusExecScan(CustomScanState *node)
}
/*
* CitusModifyBeginScan first evaluates expressions in the query and then
* performs shard pruning in case the partition column in an insert was
* defined as a function call.
*
* The function also checks the validity of the given custom scan node and
* gets locks on the shards involved in the task list of the distributed plan.
*/
static void
CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags)
{
CitusScanState *scanState = (CitusScanState *) node;
DistributedPlan *distributedPlan = NULL;
Job *workerJob = NULL;
Query *jobQuery = NULL;
List *taskList = NIL;
/*
* We must not change the distributed plan since it may be reused across multiple
* executions of a prepared statement. Instead we create a deep copy that we only
* use for the current execution.
*/
distributedPlan = scanState->distributedPlan = copyObject(scanState->distributedPlan);
workerJob = distributedPlan->workerJob;
jobQuery = workerJob->jobQuery;
taskList = workerJob->taskList;
if (workerJob->requiresMasterEvaluation)
{
PlanState *planState = &(scanState->customScanState.ss.ps);
EState *executorState = planState->state;
ExecuteMasterEvaluableFunctions(jobQuery, planState);
/*
* We've processed parameters in ExecuteMasterEvaluableFunctions and
* don't need to send their values to workers, since they will be
* represented as constants in the deparsed query. To avoid sending
* parameter values, we set the parameter list to NULL.
*/
executorState->es_param_list_info = NULL;
if (workerJob->deferredPruning)
{
DeferredErrorMessage *planningError = NULL;
/* need to perform shard pruning, rebuild the task list from scratch */
taskList = RouterInsertTaskList(jobQuery, &planningError);
if (planningError != NULL)
{
RaiseDeferredError(planningError, ERROR);
}
workerJob->taskList = taskList;
workerJob->partitionKeyValue = ExtractInsertPartitionKeyValue(jobQuery);
}
RebuildQueryStrings(jobQuery, taskList);
}
/* prevent concurrent placement changes */
AcquireMetadataLocks(taskList);
/*
* We are taking locks on partitions of partitioned tables. These locks are
* necessary for locking tables that appear in the SELECT part of the query.
*/
LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock);
/* modify tasks are always assigned using first-replica policy */
workerJob->taskList = FirstReplicaAssignTaskList(taskList);
}
/*
* AdaptiveExecutorCreateScan creates the scan state for the adaptive executor.
*/
@ -222,24 +261,6 @@ AdaptiveExecutorCreateScan(CustomScan *scan)
}
/*
* RealTimeCreateScan creates the scan state for real-time executor queries.
*/
static Node *
RealTimeCreateScan(CustomScan *scan)
{
CitusScanState *scanState = palloc0(sizeof(CitusScanState));
scanState->executorType = MULTI_EXECUTOR_REAL_TIME;
scanState->customScanState.ss.ps.type = T_CustomScanState;
scanState->distributedPlan = GetDistributedPlan(scan);
scanState->customScanState.methods = &RealTimeCustomExecMethods;
return (Node *) scanState;
}
/*
* TaskTrackerCreateScan creates the scan state for task-tracker executor queries.
*/
@ -258,49 +279,6 @@ TaskTrackerCreateScan(CustomScan *scan)
}
/*
* RouterCreateScan creates the scan state for router executor queries.
*/
static Node *
RouterCreateScan(CustomScan *scan)
{
CitusScanState *scanState = palloc0(sizeof(CitusScanState));
DistributedPlan *distributedPlan = NULL;
Job *workerJob = NULL;
List *taskList = NIL;
bool isModificationQuery = false;
List *relationRowLockList = NIL;
scanState->executorType = MULTI_EXECUTOR_ROUTER;
scanState->customScanState.ss.ps.type = T_CustomScanState;
scanState->distributedPlan = GetDistributedPlan(scan);
distributedPlan = scanState->distributedPlan;
workerJob = distributedPlan->workerJob;
taskList = workerJob->taskList;
isModificationQuery = IsModifyDistributedPlan(distributedPlan);
if (list_length(taskList) == 1)
{
Task *task = (Task *) linitial(taskList);
relationRowLockList = task->relationRowLockList;
}
/* if query is SELECT ... FOR UPDATE query, use modify logic */
if (isModificationQuery || relationRowLockList != NIL)
{
scanState->customScanState.methods = &RouterModifyCustomExecMethods;
}
else
{
scanState->customScanState.methods = &RouterSelectCustomExecMethods;
}
return (Node *) scanState;
}
/*
* CoordinatorInsertSelectCrateScan creates the scan state for executing
* INSERT..SELECT into a distributed table via the coordinator.
@ -362,8 +340,7 @@ CitusEndScan(CustomScanState *node)
/* queryId is not set if pg_stat_statements is not installed */
if (queryId != 0)
{
if (partitionKeyConst != NULL && (executorType == MULTI_EXECUTOR_ROUTER ||
executorType == MULTI_EXECUTOR_ADAPTIVE))
if (partitionKeyConst != NULL && executorType == MULTI_EXECUTOR_ADAPTIVE)
{
partitionKeyString = DatumToString(partitionKeyConst->constvalue,
partitionKeyConst->consttype);

View File

@ -0,0 +1,449 @@
/*-------------------------------------------------------------------------
*
* distributed_execution_locks.c
*
* Definitions of the functions used in executing distributed
* execution locking.
*
* Copyright (c) Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#include "distributed/distributed_execution_locks.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/resource_lock.h"
#include "distributed/transaction_management.h"
static bool RequiresConsistentSnapshot(Task *task);
static void AcquireExecutorShardLockForRowModify(Task *task, RowModifyLevel modLevel);
static void AcquireExecutorShardLocksForRelationRowLockList(List *relationRowLockList);
/*
* AcquireExecutorShardLocks acquires locks on shards for the given task if
* necessary to avoid divergence between multiple replicas of the same shard.
* No lock is obtained when there is only one replica.
*
* The function determines the appropriate lock mode based on the commutativity
* rule of the command. In each case, it uses a lock mode that enforces the
* commutativity rule.
*
* The mapping is overridden when all_modifications_commutative is set to true.
* In that case, all modifications are treated as commutative, which can be used
* to communicate that the application is only generating commutative
* UPDATE/DELETE/UPSERT commands and exclusive locks are unnecessary.
*/
void
AcquireExecutorShardLocks(Task *task, RowModifyLevel modLevel)
{
AcquireExecutorShardLockForRowModify(task, modLevel);
AcquireExecutorShardLocksForRelationRowLockList(task->relationRowLockList);
/*
* If the task has a subselect, then we may need to lock the shards from which
* the query selects as well to prevent the subselects from seeing different
* results on different replicas. In particular this prevents INSERT.. SELECT
* commands from having a different effect on different placements.
*/
if (RequiresConsistentSnapshot(task))
{
/*
* ExclusiveLock conflicts with all lock types used by modifications
* and therefore prevents other modifications from running
* concurrently.
*/
LockRelationShardResources(task->relationShardList, ExclusiveLock);
}
}
/*
* AcquireExecutorMultiShardLocks acquires shard locks needed for execution
* of writes on multiple shards. In addition to honouring commutativity
* rules, we currently only allow a single multi-shard command on a shard at
* a time. Otherwise, concurrent multi-shard commands may take row-level
* locks on the shard placements in a different order and create a distributed
* deadlock. This applies even when writes are commutative and/or there is
* no replication.
*
* 1. If citus.all_modifications_commutative is set to true, then all locks
* are acquired as ShareUpdateExclusiveLock.
*
* 2. If citus.all_modifications_commutative is false, then only the shards
* with 2 or more replicas are locked with ExclusiveLock. Otherwise, the
* lock is acquired with ShareUpdateExclusiveLock.
*
* ShareUpdateExclusiveLock conflicts with itself such that only one
* multi-shard modification at a time is allowed on a shard. It also conflicts
* with ExclusiveLock, which ensures that updates/deletes/upserts are applied
* in the same order on all placements. It does not conflict with
* RowExclusiveLock, which is normally obtained by single-shard, commutative
* writes.
*/
void
AcquireExecutorMultiShardLocks(List *taskList)
{
ListCell *taskCell = NULL;
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
LOCKMODE lockMode = NoLock;
if (task->anchorShardId == INVALID_SHARD_ID)
{
/* no shard locks to take if the task is not anchored to a shard */
continue;
}
if (AllModificationsCommutative || list_length(task->taskPlacementList) == 1)
{
/*
* When all writes are commutative then we only need to prevent multi-shard
* commands from running concurrently with each other and with commands
* that are explicitly non-commutative. When there is no replication then
* we only need to prevent concurrent multi-shard commands.
*
* In either case, ShareUpdateExclusive has the desired effect, since
* it conflicts with itself and ExclusiveLock (taken by non-commutative
* writes).
*
* However, some users find this too restrictive, so we allow them to
* reduce to a RowExclusiveLock when citus.enable_deadlock_prevention
* is enabled, which lets multi-shard modifications run in parallel as
* long as they all disable the GUC.
*/
if (EnableDeadlockPrevention)
{
lockMode = ShareUpdateExclusiveLock;
}
else
{
lockMode = RowExclusiveLock;
}
}
else
{
/*
* When there is replication, prevent all concurrent writes to the same
* shards to ensure the writes are ordered.
*/
lockMode = ExclusiveLock;
}
/*
* If we are dealing with a partition we are also taking locks on parent table
* to prevent deadlocks on concurrent operations on a partition and its parent.
*/
LockParentShardResourceIfPartition(task->anchorShardId, lockMode);
LockShardResource(task->anchorShardId, lockMode);
/*
* If the task has a subselect, then we may need to lock the shards from which
* the query selects as well to prevent the subselects from seeing different
* results on different replicas.
*/
if (RequiresConsistentSnapshot(task))
{
/*
* ExclusiveLock conflicts with all lock types used by modifications
* and therefore prevents other modifications from running
* concurrently.
*/
LockRelationShardResources(task->relationShardList, ExclusiveLock);
}
}
}
/*
* RequiresConsistentSnapshot returns true if the given task need to take
* the necessary locks to ensure that a subquery in the modify query
* returns the same output for all task placements.
*/
static bool
RequiresConsistentSnapshot(Task *task)
{
bool requiresIsolation = false;
if (!task->modifyWithSubquery)
{
/*
* Other commands do not read from other shards.
*/
requiresIsolation = false;
}
else if (list_length(task->taskPlacementList) == 1)
{
/*
* If there is only one replica then we fully rely on PostgreSQL to
* provide SELECT isolation. In this case, we do not provide isolation
* across the shards, but that was never our intention.
*/
requiresIsolation = false;
}
else if (AllModificationsCommutative)
{
/*
* An INSERT/SELECT is commutative with other writes if it excludes
* any ongoing writes based on the filter conditions. Without knowing
* whether this is true, we assume the user took this into account
* when enabling citus.all_modifications_commutative. This option
* gives users an escape from aggressive locking during INSERT/SELECT.
*/
requiresIsolation = false;
}
else
{
/*
* If this is a non-commutative write, then we need to block ongoing
* writes to make sure that the subselect returns the same result
* on all placements.
*/
requiresIsolation = true;
}
return requiresIsolation;
}
/*
* AcquireMetadataLocks acquires metadata locks on each of the anchor
* shards in the task list to prevent a shard being modified while it
* is being copied.
*/
void
AcquireMetadataLocks(List *taskList)
{
ListCell *taskCell = NULL;
/*
* Note: to avoid the overhead of additional sorting, we assume tasks
* to be already sorted by shard ID such that deadlocks are avoided.
* This is true for INSERT/SELECT, which is the only multi-shard
* command right now.
*/
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
LockShardDistributionMetadata(task->anchorShardId, ShareLock);
}
}
static void
AcquireExecutorShardLockForRowModify(Task *task, RowModifyLevel modLevel)
{
LOCKMODE lockMode = NoLock;
int64 shardId = task->anchorShardId;
if (shardId == INVALID_SHARD_ID)
{
return;
}
if (modLevel <= ROW_MODIFY_READONLY)
{
/*
* The executor shard lock is used to maintain consistency between
* replicas and therefore no lock is required for read-only queries
* or in general when there is only one replica.
*/
lockMode = NoLock;
}
else if (list_length(task->taskPlacementList) == 1)
{
if (task->replicationModel == REPLICATION_MODEL_2PC)
{
/*
* While we don't need a lock to ensure writes are applied in
* a consistent order when there is a single replica. We also use
* shard resource locks as a crude implementation of SELECT..
* FOR UPDATE on reference tables, so we should always take
* a lock that conflicts with the FOR UPDATE/SHARE locks.
*/
lockMode = RowExclusiveLock;
}
else
{
/*
* When there is no replication, the worker itself can decide on
* on the order in which writes are applied.
*/
lockMode = NoLock;
}
}
else if (AllModificationsCommutative)
{
/*
* Bypass commutativity checks when citus.all_modifications_commutative
* is enabled.
*
* A RowExclusiveLock does not conflict with itself and therefore allows
* multiple commutative commands to proceed concurrently. It does
* conflict with ExclusiveLock, which may still be obtained by another
* session that executes an UPDATE/DELETE/UPSERT command with
* citus.all_modifications_commutative disabled.
*/
lockMode = RowExclusiveLock;
}
else if (modLevel < ROW_MODIFY_NONCOMMUTATIVE)
{
/*
* An INSERT commutes with other INSERT commands, since performing them
* out-of-order only affects the table order on disk, but not the
* contents.
*
* When a unique constraint exists, INSERTs are not strictly commutative,
* but whichever INSERT comes last will error out and thus has no effect.
* INSERT is not commutative with UPDATE/DELETE/UPSERT, since the
* UPDATE/DELETE/UPSERT may consider the INSERT, depending on execution
* order.
*
* A RowExclusiveLock does not conflict with itself and therefore allows
* multiple INSERT commands to proceed concurrently. It conflicts with
* ExclusiveLock obtained by UPDATE/DELETE/UPSERT, ensuring those do
* not run concurrently with INSERT.
*/
lockMode = RowExclusiveLock;
}
else
{
/*
* UPDATE/DELETE/UPSERT commands do not commute with other modifications
* since the rows modified by one command may be affected by the outcome
* of another command.
*
* We need to handle upsert before INSERT, because PostgreSQL models
* upsert commands as INSERT with an ON CONFLICT section.
*
* ExclusiveLock conflicts with all lock types used by modifications
* and therefore prevents other modifications from running
* concurrently.
*/
lockMode = ExclusiveLock;
}
if (lockMode != NoLock)
{
ShardInterval *shardInterval = LoadShardInterval(shardId);
SerializeNonCommutativeWrites(list_make1(shardInterval), lockMode);
}
}
static void
AcquireExecutorShardLocksForRelationRowLockList(List *relationRowLockList)
{
ListCell *relationRowLockCell = NULL;
LOCKMODE rowLockMode = NoLock;
if (relationRowLockList == NIL)
{
return;
}
/*
* If lock clause exists and it effects any reference table, we need to get
* lock on shard resource. Type of lock is determined by the type of row lock
* given in the query. If the type of row lock is either FOR NO KEY UPDATE or
* FOR UPDATE we get ExclusiveLock on shard resource. We get ShareLock if it
* is FOR KEY SHARE or FOR KEY SHARE.
*
* We have selected these lock types according to conflict table given in the
* Postgres documentation. It is given that FOR UPDATE and FOR NO KEY UPDATE
* must be conflict with each other modify command. By getting ExlcusiveLock
* we guarantee that. Note that, getting ExlusiveLock does not mimic the
* behaviour of Postgres exactly. Getting row lock with FOR NO KEY UPDATE and
* FOR KEY SHARE do not conflict in Postgres, yet they block each other in
* our implementation. Since FOR SHARE and FOR KEY SHARE does not conflict
* with each other but conflicts with modify commands, we get ShareLock for
* them.
*/
foreach(relationRowLockCell, relationRowLockList)
{
RelationRowLock *relationRowLock = lfirst(relationRowLockCell);
LockClauseStrength rowLockStrength = relationRowLock->rowLockStrength;
Oid relationId = relationRowLock->relationId;
if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE)
{
List *shardIntervalList = LoadShardIntervalList(relationId);
if (rowLockStrength == LCS_FORKEYSHARE || rowLockStrength == LCS_FORSHARE)
{
rowLockMode = ShareLock;
}
else if (rowLockStrength == LCS_FORNOKEYUPDATE ||
rowLockStrength == LCS_FORUPDATE)
{
rowLockMode = ExclusiveLock;
}
SerializeNonCommutativeWrites(shardIntervalList, rowLockMode);
}
}
}
/*
* LockPartitionsInRelationList iterates over given list and acquires locks on
* partitions of each partitioned table. It does nothing for non-partitioned tables.
*/
void
LockPartitionsInRelationList(List *relationIdList, LOCKMODE lockmode)
{
ListCell *relationIdCell = NULL;
foreach(relationIdCell, relationIdList)
{
Oid relationId = lfirst_oid(relationIdCell);
if (PartitionedTable(relationId))
{
LockPartitionRelations(relationId, lockmode);
}
}
}
/*
* LockPartitionRelations acquires relation lock on all partitions of given
* partitioned relation. This function expects that given relation is a
* partitioned relation.
*/
void
LockPartitionRelations(Oid relationId, LOCKMODE lockMode)
{
/*
* PartitionList function generates partition list in the same order
* as PostgreSQL. Therefore we do not need to sort it before acquiring
* locks.
*/
List *partitionList = PartitionList(relationId);
ListCell *partitionCell = NULL;
foreach(partitionCell, partitionList)
{
Oid partitionRelationId = lfirst_oid(partitionCell);
LockRelationOid(partitionRelationId, lockMode);
}
}

View File

@ -12,13 +12,13 @@
#include "miscadmin.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/distributed_execution_locks.h"
#include "distributed/insert_select_executor.h"
#include "distributed/insert_select_planner.h"
#include "distributed/local_executor.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_executor.h"
#include "distributed/distributed_planner.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/resource_lock.h"
@ -170,34 +170,17 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
if (prunedTaskList != NIL)
{
if (TaskExecutorType != MULTI_EXECUTOR_ADAPTIVE)
{
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{
ExecuteModifyTasksSequentially(scanState, prunedTaskList,
ROW_MODIFY_COMMUTATIVE,
hasReturning);
}
else
{
ExecuteMultipleTasks(scanState, prunedTaskList, true,
hasReturning);
}
}
else
{
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
bool randomAccess = true;
bool interTransactions = false;
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
bool randomAccess = true;
bool interTransactions = false;
Assert(scanState->tuplestorestate == NULL);
scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
Assert(scanState->tuplestorestate == NULL);
scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
ExecuteTaskListExtended(ROW_MODIFY_COMMUTATIVE, prunedTaskList,
tupleDescriptor, scanState->tuplestorestate,
hasReturning, MaxAdaptiveExecutorPoolSize);
}
ExecuteTaskListExtended(ROW_MODIFY_COMMUTATIVE, prunedTaskList,
tupleDescriptor, scanState->tuplestorestate,
hasReturning, MaxAdaptiveExecutorPoolSize);
if (SortReturning && hasReturning)
{

View File

@ -65,10 +65,9 @@
* currently only supports queries. In other words, any utility commands like TRUNCATE,
* fails if the command is executed after a local execution inside a transaction block.
* Forth, the local execution cannot be mixed with the executors other than adaptive,
* namely task-tracker, real-time and router executors. Finally, related with the
* previous item, COPY command cannot be mixed with local execution in a transaction.
* The implication of that any part of INSERT..SELECT via coordinator cannot happen
* via the local execution.
* namely task-tracker executor. Finally, related with the previous item, COPY command
* cannot be mixed with local execution in a transaction. The implication of that any
* part of INSERT..SELECT via coordinator cannot happen via the local execution.
*/
#include "postgres.h"
#include "miscadmin.h"
@ -79,7 +78,6 @@
#include "distributed/metadata_cache.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h" /* to access LogRemoteCommands */
#include "distributed/multi_router_executor.h"
#include "distributed/transaction_management.h"
#include "executor/tstoreReceiver.h"
#include "executor/tuptable.h"

View File

@ -177,65 +177,6 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD
}
/*
* MultiClientPlacementConnectStart asynchronously tries to establish a connection
* for a particular set of shard placements. If it succeeds, it returns the
* the connection id. Otherwise, it reports connection error and returns
* INVALID_CONNECTION_ID.
*/
int32
MultiClientPlacementConnectStart(List *placementAccessList, const char *userName)
{
MultiConnection *connection = NULL;
ConnStatusType connStatusType = CONNECTION_OK;
int32 connectionId = AllocateConnectionId();
int connectionFlags = 0;
/*
* Although we're opening connections for SELECT queries, we're relying
* on multi_shard_modify_mode GUC. The name of the GUC is unfortunate, but,
* adding one more GUC (or renaming the GUC) would make the UX even worse.
*/
if (MultiShardConnectionType == PARALLEL_CONNECTION)
{
connectionFlags = CONNECTION_PER_PLACEMENT;
}
if (connectionId == INVALID_CONNECTION_ID)
{
ereport(WARNING, (errmsg("could not allocate connection in connection pool")));
return connectionId;
}
/* prepare asynchronous request for worker node connection */
connection = StartPlacementListConnection(connectionFlags, placementAccessList,
userName);
ClaimConnectionExclusively(connection);
connStatusType = PQstatus(connection->pgConn);
/*
* If prepared, we save the connection, and set its initial polling status
* to PGRES_POLLING_WRITING as specified in "Database Connection Control
* Functions" section of the PostgreSQL documentation.
*/
if (connStatusType != CONNECTION_BAD)
{
ClientConnectionArray[connectionId] = connection;
ClientPollingStatusArray[connectionId] = PGRES_POLLING_WRITING;
}
else
{
ReportConnectionError(connection, WARNING);
connectionId = INVALID_CONNECTION_ID;
}
return connectionId;
}
/* MultiClientConnectPoll returns the status of client connection. */
ConnectStatus
MultiClientConnectPoll(int32 connectionId)
@ -290,22 +231,6 @@ MultiClientConnectPoll(int32 connectionId)
}
/* MultiClientGetConnection returns the connection with the given ID from the pool */
MultiConnection *
MultiClientGetConnection(int32 connectionId)
{
if (connectionId == INVALID_CONNECTION_ID)
{
return NULL;
}
Assert(connectionId >= 0);
Assert(connectionId < MAX_CONNECTION_COUNT);
return ClientConnectionArray[connectionId];
}
/* MultiClientDisconnect disconnects the connection. */
void
MultiClientDisconnect(int32 connectionId)
@ -324,40 +249,6 @@ MultiClientDisconnect(int32 connectionId)
}
/*
* MultiClientReleaseConnection removes a connection from the client
* executor pool without disconnecting if it is run in the transaction
* otherwise it disconnects.
*
* This allows the connection to be used for other operations in the
* same transaction. The connection will still be closed at COMMIT
* or ABORT time.
*/
void
MultiClientReleaseConnection(int32 connectionId)
{
MultiConnection *connection = NULL;
const int InvalidPollingStatus = -1;
Assert(connectionId != INVALID_CONNECTION_ID);
connection = ClientConnectionArray[connectionId];
Assert(connection != NULL);
/* allow using same connection only in the same transaction */
if (!InCoordinatedTransaction())
{
MultiClientDisconnect(connectionId);
}
else
{
UnclaimConnection(connection);
}
ClientConnectionArray[connectionId] = NULL;
ClientPollingStatusArray[connectionId] = InvalidPollingStatus;
}
/*
* MultiClientConnectionUp checks if the connection status is up, in other words,
* it is not bad.
@ -383,26 +274,6 @@ MultiClientConnectionUp(int32 connectionId)
}
/* MultiClientExecute synchronously executes a query over the given connection. */
bool
MultiClientExecute(int32 connectionId, const char *query, void **queryResult,
int *rowCount, int *columnCount)
{
bool querySent = false;
bool queryOK = false;
querySent = MultiClientSendQuery(connectionId, query);
if (!querySent)
{
return false;
}
queryOK = MultiClientQueryResult(connectionId, queryResult, rowCount, columnCount);
return queryOK;
}
/* MultiClientSendQuery sends the given query over the given connection. */
bool
MultiClientSendQuery(int32 connectionId, const char *query)
@ -499,51 +370,6 @@ MultiClientResultStatus(int32 connectionId)
}
/* MultiClientQueryResult gets results for an asynchronous query. */
bool
MultiClientQueryResult(int32 connectionId, void **queryResult, int *rowCount,
int *columnCount)
{
MultiConnection *connection = NULL;
PGresult *result = NULL;
ConnStatusType connStatusType = CONNECTION_OK;
ExecStatusType resultStatus = PGRES_COMMAND_OK;
bool raiseInterrupts = true;
Assert(connectionId != INVALID_CONNECTION_ID);
connection = ClientConnectionArray[connectionId];
Assert(connection != NULL);
connStatusType = PQstatus(connection->pgConn);
if (connStatusType == CONNECTION_BAD)
{
ereport(WARNING, (errmsg("could not maintain connection to worker node")));
return false;
}
result = GetRemoteCommandResult(connection, raiseInterrupts);
resultStatus = PQresultStatus(result);
if (resultStatus == PGRES_TUPLES_OK)
{
(*queryResult) = (void **) result;
(*rowCount) = PQntuples(result);
(*columnCount) = PQnfields(result);
}
else
{
ReportResultError(connection, result, WARNING);
PQclear(result);
return false;
}
/* clear extra result objects */
ForgetResults(connection);
return true;
}
/*
* MultiClientBatchResult returns results for a "batch" of queries, meaning a
* string containing multiple select statements separated by semicolons. This
@ -619,15 +445,6 @@ MultiClientGetValue(void *queryResult, int rowIndex, int columnIndex)
}
/* MultiClientValueIsNull returns whether the value at the given position is null. */
bool
MultiClientValueIsNull(void *queryResult, int rowIndex, int columnIndex)
{
bool isNull = PQgetisnull((PGresult *) queryResult, rowIndex, columnIndex);
return isNull;
}
/* MultiClientClearResult free's the memory associated with a PGresult. */
void
MultiClientClearResult(void *queryResult)
@ -815,236 +632,6 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor, uint64 *returnByte
}
/*
* MultiClientCreateWaitInfo creates a WaitInfo structure, capable of keeping
* track of what maxConnections connections are waiting for; to allow
* efficiently waiting for all of them at once.
*
* Connections can be added using MultiClientRegisterWait(). All added
* connections can then be waited upon together using MultiClientWait().
*/
WaitInfo *
MultiClientCreateWaitInfo(int maxConnections)
{
WaitInfo *waitInfo = palloc(sizeof(WaitInfo));
#ifndef HAVE_POLL
/* we subtract 2 to make room for the WL_POSTMASTER_DEATH and WL_LATCH_SET events */
if (maxConnections > FD_SETSIZE - 2)
{
maxConnections = FD_SETSIZE - 2;
}
#endif
waitInfo->maxWaiters = maxConnections;
/* we use poll(2) if available, otherwise select(2) */
#ifdef HAVE_POLL
waitInfo->pollfds = palloc(maxConnections * sizeof(struct pollfd));
#endif
/* initialize remaining fields */
MultiClientResetWaitInfo(waitInfo);
return waitInfo;
}
/* MultiClientResetWaitInfo clears all pending waits from a WaitInfo. */
void
MultiClientResetWaitInfo(WaitInfo *waitInfo)
{
waitInfo->registeredWaiters = 0;
waitInfo->haveReadyWaiter = false;
waitInfo->haveFailedWaiter = false;
#ifndef HAVE_POLL
FD_ZERO(&(waitInfo->readFileDescriptorSet));
FD_ZERO(&(waitInfo->writeFileDescriptorSet));
FD_ZERO(&(waitInfo->exceptionFileDescriptorSet));
waitInfo->maxConnectionFileDescriptor = 0;
#endif
}
/* MultiClientFreeWaitInfo frees a resources associated with a waitInfo struct. */
void
MultiClientFreeWaitInfo(WaitInfo *waitInfo)
{
#ifdef HAVE_POLL
pfree(waitInfo->pollfds);
#endif
pfree(waitInfo);
}
/*
* MultiClientRegisterWait adds a connection to be waited upon, waiting for
* executionStatus.
*/
void
MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus executionStatus,
int32 connectionId)
{
MultiConnection *connection = NULL;
#ifdef HAVE_POLL
struct pollfd *pollfd = NULL;
#else
int connectionFileDescriptor = 0;
#endif
/* This is to make sure we could never register more than maxWaiters in Windows */
if (waitInfo->registeredWaiters >= waitInfo->maxWaiters)
{
return;
}
if (executionStatus == TASK_STATUS_READY)
{
waitInfo->haveReadyWaiter = true;
return;
}
else if (executionStatus == TASK_STATUS_ERROR)
{
waitInfo->haveFailedWaiter = true;
return;
}
connection = ClientConnectionArray[connectionId];
#ifdef HAVE_POLL
pollfd = &waitInfo->pollfds[waitInfo->registeredWaiters];
pollfd->fd = PQsocket(connection->pgConn);
if (executionStatus == TASK_STATUS_SOCKET_READ)
{
pollfd->events = POLLERR | POLLIN;
}
else if (executionStatus == TASK_STATUS_SOCKET_WRITE)
{
pollfd->events = POLLERR | POLLOUT;
}
#else
connectionFileDescriptor = PQsocket(connection->pgConn);
if (connectionFileDescriptor > waitInfo->maxConnectionFileDescriptor)
{
waitInfo->maxConnectionFileDescriptor = connectionFileDescriptor;
}
if (executionStatus == TASK_STATUS_SOCKET_READ)
{
FD_SET(connectionFileDescriptor, &(waitInfo->readFileDescriptorSet));
}
else if (executionStatus == TASK_STATUS_SOCKET_WRITE)
{
FD_SET(connectionFileDescriptor, &(waitInfo->writeFileDescriptorSet));
}
#endif
waitInfo->registeredWaiters++;
}
/*
* MultiClientWait waits until at least one connection added with
* MultiClientRegisterWait is ready to be processed again.
*/
void
MultiClientWait(WaitInfo *waitInfo)
{
/*
* If we had a failure, we always want to sleep for a bit, to prevent
* flooding the other system, probably making the situation worse.
*/
if (waitInfo->haveFailedWaiter)
{
long sleepIntervalPerCycle = RemoteTaskCheckInterval * 1000L;
pg_usleep(sleepIntervalPerCycle);
return;
}
/* if there are tasks that already need attention again, don't wait */
if (waitInfo->haveReadyWaiter)
{
return;
}
while (true)
{
/*
* Wait for activity on any of the sockets. Limit the maximum time
* spent waiting in one wait cycle, as insurance against edge
* cases. For efficiency we don't want to wake quite as often as
* citus.remote_task_check_interval, so rather arbitrarily sleep ten
* times as long.
*/
#ifdef HAVE_POLL
int rc = poll(waitInfo->pollfds, waitInfo->registeredWaiters,
RemoteTaskCheckInterval * 10);
#else
int maxConnectionFileDescriptor = waitInfo->maxConnectionFileDescriptor;
const int maxTimeout = RemoteTaskCheckInterval * 10 * 1000L;
struct timeval selectTimeout = { 0, maxTimeout };
int rc = 0;
/* it is not okay to call select when there is nothing to wait for */
if (waitInfo->registeredWaiters == 0)
{
return;
}
rc = (select) (maxConnectionFileDescriptor + 1,
&(waitInfo->readFileDescriptorSet),
&(waitInfo->writeFileDescriptorSet),
&(waitInfo->exceptionFileDescriptorSet),
&selectTimeout);
#endif
if (rc < 0)
{
/*
* Signals that arrive can interrupt our poll(). In that case just
* return. Every other error is unexpected and treated as such.
*/
int errorCode = errno;
#ifdef WIN32
errorCode = WSAGetLastError();
#endif
if (errorCode == 0)
{
return;
}
else if (errorCode == EAGAIN || errorCode == EINTR)
{
return;
}
else
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("poll failed: %m")));
}
}
else if (rc == 0)
{
ereport(DEBUG5,
(errmsg("waiting for activity on tasks took longer than %d ms",
(int) RemoteTaskCheckInterval * 10)));
}
/*
* At least one fd changed received a readiness notification, time to
* process tasks again.
*/
return;
}
}
/*
* ClientConnectionReady checks if the given connection is ready for non-blocking
* reads or writes. This function is loosely based on pqSocketCheck() at fe-misc.c

View File

@ -23,16 +23,18 @@
#include "distributed/multi_executor.h"
#include "distributed/multi_master_planner.h"
#include "distributed/distributed_planner.h"
#include "distributed/multi_router_executor.h"
#include "distributed/multi_router_planner.h"
#include "distributed/multi_resowner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/resource_lock.h"
#include "distributed/transaction_management.h"
#include "distributed/worker_protocol.h"
#include "executor/execdebug.h"
#include "commands/copy.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "parser/parsetree.h"
#include "parser/parse_oper.h"
#include "storage/lmgr.h"
#include "tcop/dest.h"
#include "tcop/pquery.h"
@ -43,12 +45,16 @@
/*
* Controls the connection type for multi shard modifications, DDLs
* TRUNCATE and real-time SELECT queries.
* TRUNCATE and multi-shard SELECT queries.
*/
int MultiShardConnectionType = PARALLEL_CONNECTION;
bool WritableStandbyCoordinator = false;
/* sort the returning to get consistent outputs, used only for testing */
bool SortReturning = false;
/* local function forward declarations */
static bool IsCitusPlan(Plan *plan);
static bool IsCitusCustomScan(Plan *plan);
@ -265,7 +271,7 @@ ReturnTupleFromTuplestore(CitusScanState *scanState)
/*
* Load data collected by real-time or task-tracker executors into the tuplestore
* Load data collected by task-tracker executor into the tuplestore
* of CitusScanState. For that, we first create a tuple store, and then copy the
* files one-by-one into the tuple store.
*
@ -371,6 +377,106 @@ ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc tupleDescript
}
/*
* SortTupleStore gets a CitusScanState and sorts the tuplestore by all the
* entries in the target entry list, starting from the first one and
* ending with the last entry.
*
* The sorting is done in ASC order.
*/
void
SortTupleStore(CitusScanState *scanState)
{
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
Tuplestorestate *tupleStore = scanState->tuplestorestate;
List *targetList = scanState->customScanState.ss.ps.plan->targetlist;
uint32 expectedColumnCount = list_length(targetList);
/* Convert list-ish representation to arrays wanted by executor */
int numberOfSortKeys = expectedColumnCount;
AttrNumber *sortColIdx = (AttrNumber *) palloc(numberOfSortKeys * sizeof(AttrNumber));
Oid *sortOperators = (Oid *) palloc(numberOfSortKeys * sizeof(Oid));
Oid *collations = (Oid *) palloc(numberOfSortKeys * sizeof(Oid));
bool *nullsFirst = (bool *) palloc(numberOfSortKeys * sizeof(bool));
ListCell *targetCell = NULL;
int sortKeyIndex = 0;
Tuplesortstate *tuplesortstate = NULL;
/*
* Iterate on the returning target list and generate the necessary information
* for sorting the tuples.
*/
foreach(targetCell, targetList)
{
TargetEntry *returningEntry = (TargetEntry *) lfirst(targetCell);
Oid sortop = InvalidOid;
/* determine the sortop, we don't need anything else */
get_sort_group_operators(exprType((Node *) returningEntry->expr),
true, false, false,
&sortop, NULL, NULL,
NULL);
sortColIdx[sortKeyIndex] = sortKeyIndex + 1;
sortOperators[sortKeyIndex] = sortop;
collations[sortKeyIndex] = exprCollation((Node *) returningEntry->expr);
nullsFirst[sortKeyIndex] = false;
sortKeyIndex++;
}
tuplesortstate =
tuplesort_begin_heap(tupleDescriptor, numberOfSortKeys, sortColIdx, sortOperators,
collations, nullsFirst, work_mem, NULL, false);
while (true)
{
TupleTableSlot *slot = ReturnTupleFromTuplestore(scanState);
if (TupIsNull(slot))
{
break;
}
/* tuplesort_puttupleslot copies the slot into sort context */
tuplesort_puttupleslot(tuplesortstate, slot);
}
/* perform the actual sort operation */
tuplesort_performsort(tuplesortstate);
/*
* Truncate the existing tupleStore, because we'll fill it back
* from the sorted tuplestore.
*/
tuplestore_clear(tupleStore);
/* iterate over all the sorted tuples, add them to original tuplestore */
while (true)
{
TupleTableSlot *newSlot = MakeSingleTupleTableSlotCompat(tupleDescriptor,
&TTSOpsMinimalTuple);
bool found = tuplesort_gettupleslot(tuplesortstate, true, false, newSlot, NULL);
if (!found)
{
break;
}
/* tuplesort_puttupleslot copies the slot into the tupleStore context */
tuplestore_puttupleslot(tupleStore, newSlot);
}
tuplestore_rescan(scanState->tuplestorestate);
/* terminate the sort, clear unnecessary resources */
tuplesort_end(tuplesortstate);
}
/*
* StubRelation creates a stub Relation from the given tuple descriptor.
* To be able to use copy.c, we need a Relation descriptor. As there is no

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -2,7 +2,7 @@
*
* multi_server_executor.c
*
* Function definitions for distributed task execution for real-time
* Function definitions for distributed task execution for adaptive
* and task-tracker executors, and routines common to both. The common
* routines are implement backend-side logic; and they trigger executions
* on the client-side via function hooks that they load.
@ -176,7 +176,6 @@ InitTaskExecution(Task *task, TaskExecStatus initialTaskExecStatus)
taskExecution->jobId = task->jobId;
taskExecution->taskId = task->taskId;
taskExecution->nodeCount = nodeCount;
taskExecution->connectStartTime = 0;
taskExecution->currentNodeIndex = 0;
taskExecution->failureCount = 0;
@ -244,11 +243,6 @@ CleanupTaskExecution(TaskExecution *taskExecution)
bool
TaskExecutionFailed(TaskExecution *taskExecution)
{
if (taskExecution->criticalErrorOccurred)
{
return true;
}
if (taskExecution->failureCount >= MAX_TASK_EXECUTION_FAILURES)
{
return true;

View File

@ -28,12 +28,14 @@
#include "distributed/citus_custom_scan.h"
#include "distributed/citus_nodes.h"
#include "distributed/connection_management.h"
#include "distributed/distributed_execution_locks.h"
#include "distributed/local_executor.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/multi_resowner.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/resource_lock.h"
#include "distributed/subplan_execution.h"
@ -91,6 +93,7 @@ static TaskTracker * ResolveMapTaskTracker(HTAB *trackerHash, Task *task,
TaskExecution *taskExecution);
static TaskTracker * TrackerHashLookup(HTAB *trackerHash, const char *nodeName,
uint32 nodePort);
static void PrepareMasterJobDirectory(Job *workerJob);
/* Local functions forward declarations to manage tasks and their assignments */
static TaskExecStatus ManageTaskExecution(TaskTracker *taskTracker,
@ -179,7 +182,7 @@ MultiTaskTrackerExecute(Job *job)
{
ereport(ERROR, (errmsg("task tracker queries are not allowed while "
"citus.use_secondary_nodes is 'always'"),
errhint("try setting citus.task_executor_type TO 'real-time'")));
errhint("try setting citus.task_executor_type TO 'adaptive'")));
}
/*
@ -3033,3 +3036,19 @@ TaskTrackerExecScan(CustomScanState *node)
return resultSlot;
}
/*
* PrepareMasterJobDirectory creates a directory on the master node to keep job
* execution results. We also register this directory for automatic cleanup on
* portal delete.
*/
static void
PrepareMasterJobDirectory(Job *workerJob)
{
StringInfo jobDirectoryName = MasterJobDirectoryName(workerJob->jobId);
CitusCreateDirectory(jobDirectoryName);
ResourceOwnerEnlargeJobDirectories(CurrentResourceOwner);
ResourceOwnerRememberJobDirectory(CurrentResourceOwner, workerJob->jobId);
}

View File

@ -0,0 +1,162 @@
/*-------------------------------------------------------------------------
*
* citus_custom_scan.c
*
* Definitions of the functions used in generating the placement accesses
* for distributed query execution.
*
* Copyright (c) Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#include "distributed/placement_access.h"
#include "distributed/metadata_cache.h"
static List * BuildPlacementSelectList(int32 groupId, List *relationShardList);
static List * BuildPlacementDDLList(int32 groupId, List *relationShardList);
static List * BuildPlacementAccessList(int32 groupId, List *relationShardList,
ShardPlacementAccessType accessType);
/*
* PlacementAccessListForTask returns a list of placement accesses for a given
* task and task placement.
*/
List *
PlacementAccessListForTask(Task *task, ShardPlacement *taskPlacement)
{
List *placementAccessList = NIL;
List *relationShardList = task->relationShardList;
bool addAnchorAccess = false;
ShardPlacementAccessType accessType = PLACEMENT_ACCESS_SELECT;
if (task->taskType == MODIFY_TASK)
{
/* DML command */
addAnchorAccess = true;
accessType = PLACEMENT_ACCESS_DML;
}
else if (task->taskType == DDL_TASK || task->taskType == VACUUM_ANALYZE_TASK)
{
/* DDL command */
addAnchorAccess = true;
accessType = PLACEMENT_ACCESS_DDL;
}
else if (relationShardList == NIL)
{
/* SELECT query that does not touch any shard placements */
addAnchorAccess = true;
accessType = PLACEMENT_ACCESS_SELECT;
}
if (addAnchorAccess)
{
ShardPlacementAccess *placementAccess =
CreatePlacementAccess(taskPlacement, accessType);
placementAccessList = lappend(placementAccessList, placementAccess);
}
/*
* We've already added anchor shardId's placement access to the list. Now,
* add the other placements in the relationShardList.
*/
if (accessType == PLACEMENT_ACCESS_DDL)
{
/*
* All relations appearing inter-shard DDL commands should be marked
* with DDL access.
*/
List *relationShardAccessList =
BuildPlacementDDLList(taskPlacement->groupId, relationShardList);
placementAccessList = list_concat(placementAccessList, relationShardAccessList);
}
else
{
/*
* In case of SELECTs or DML's, we add SELECT placement accesses to the
* elements in relationShardList. For SELECT queries, it is trivial, since
* the query is literally accesses the relationShardList in the same query.
*
* For DMLs, create placement accesses for placements that appear in a
* subselect.
*/
List *relationShardAccessList =
BuildPlacementSelectList(taskPlacement->groupId, relationShardList);
placementAccessList = list_concat(placementAccessList, relationShardAccessList);
}
return placementAccessList;
}
/*
* BuildPlacementSelectList builds a list of SELECT placement accesses
* which can be used to call StartPlacementListConnection or
* GetPlacementListConnection. If the node group does not have a placement
* (e.g. in case of a broadcast join) then the shard is skipped.
*/
static List *
BuildPlacementSelectList(int32 groupId, List *relationShardList)
{
return BuildPlacementAccessList(groupId, relationShardList, PLACEMENT_ACCESS_SELECT);
}
/*
* BuildPlacementDDLList is a warpper around BuildPlacementAccessList() for DDL access.
*/
static List *
BuildPlacementDDLList(int32 groupId, List *relationShardList)
{
return BuildPlacementAccessList(groupId, relationShardList, PLACEMENT_ACCESS_DDL);
}
/*
* BuildPlacementAccessList returns a list of placement accesses for the given
* relationShardList and the access type.
*/
static List *
BuildPlacementAccessList(int32 groupId, List *relationShardList,
ShardPlacementAccessType accessType)
{
ListCell *relationShardCell = NULL;
List *placementAccessList = NIL;
foreach(relationShardCell, relationShardList)
{
RelationShard *relationShard = (RelationShard *) lfirst(relationShardCell);
ShardPlacement *placement = NULL;
ShardPlacementAccess *placementAccess = NULL;
placement = FindShardPlacementOnGroup(groupId, relationShard->shardId);
if (placement == NULL)
{
continue;
}
placementAccess = CreatePlacementAccess(placement, accessType);
placementAccessList = lappend(placementAccessList, placementAccess);
}
return placementAccessList;
}
/*
* CreatePlacementAccess returns a new ShardPlacementAccess for the given placement
* and access type.
*/
ShardPlacementAccess *
CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType accessType)
{
ShardPlacementAccess *placementAccess = NULL;
placementAccess = (ShardPlacementAccess *) palloc0(sizeof(ShardPlacementAccess));
placementAccess->placement = placement;
placementAccess->accessType = accessType;
return placementAccess;
}

View File

@ -94,21 +94,11 @@ CitusExecutorName(MultiExecutorType executorType)
return "adaptive";
}
case MULTI_EXECUTOR_REAL_TIME:
{
return "real-time";
}
case MULTI_EXECUTOR_TASK_TRACKER:
{
return "task-tracker";
}
case MULTI_EXECUTOR_ROUTER:
{
return "router";
}
case MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT:
{
return "insert-select";

View File

@ -180,15 +180,6 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
workerNodeList = ActivePrimaryShouldHaveShardsNodeList(NoLock);
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
/*
* Make sure we don't process cancel signals until all shards
* are created if the executor is not enabled.
*/
if (TaskExecutorType != MULTI_EXECUTOR_ADAPTIVE)
{
HOLD_INTERRUPTS();
}
workerNodeCount = list_length(workerNodeList);
if (replicationFactor > workerNodeCount)
{
@ -253,18 +244,6 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
CreateShardsOnWorkers(distributedTableId, insertedShardPlacements,
useExclusiveConnections, colocatedShard);
if (TaskExecutorType != MULTI_EXECUTOR_ADAPTIVE)
{
if (QueryCancelPending)
{
ereport(WARNING, (errmsg(
"cancel requests are ignored during shard creation")));
QueryCancelPending = false;
}
RESUME_INTERRUPTS();
}
}

View File

@ -32,11 +32,9 @@
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_executor.h"
#include "distributed/multi_router_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/distributed_planner.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/pg_dist_partition.h"

View File

@ -21,12 +21,13 @@
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/connection_management.h"
#include "distributed/distributed_planner.h"
#include "distributed/listutils.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_router_executor.h"
#include "distributed/resource_lock.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"

View File

@ -32,7 +32,7 @@
#include "distributed/distributed_planner.h"
#include "distributed/listutils.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_router_executor.h"
#include "distributed/multi_executor.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
@ -58,14 +58,6 @@
/* Local functions forward declarations */
static void CreateShardsOnWorkersViaExecutor(Oid distributedRelationId,
List *shardPlacements,
bool useExclusiveConnection, bool
colocatedShard);
static void CreateShardsOnWorkersViaCommands(Oid distributedRelationId,
List *shardPlacements,
bool useExclusiveConnection, bool
colocatedShard);
static List * RelationShardListForShardCreate(ShardInterval *shardInterval);
static bool WorkerShardStats(ShardPlacement *placement, Oid relationId,
char *shardName, uint64 *shardSize,
@ -503,33 +495,12 @@ InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList,
/*
* CreateShardsOnWorkers creates shards on worker nodes given the shard placements
* as a parameter. Function branches into two: either use the executor or execute the
* commands one by one.
* as a parameter The function creates the shards via the executor. This means
* that it can adopt the number of connections required to create the shards.
*/
void
CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
bool useExclusiveConnection, bool colocatedShard)
{
if (TaskExecutorType == MULTI_EXECUTOR_ADAPTIVE)
{
CreateShardsOnWorkersViaExecutor(distributedRelationId, shardPlacements,
useExclusiveConnection, colocatedShard);
}
else
{
CreateShardsOnWorkersViaCommands(distributedRelationId, shardPlacements,
useExclusiveConnection, colocatedShard);
}
}
/*
* CreateShardsOnWorkersViaExecutor creates the shards via the executor. This means
* that it can adopt the number of connections required to create the shards.
*/
static void
CreateShardsOnWorkersViaExecutor(Oid distributedRelationId, List *shardPlacements, bool
useExclusiveConnection, bool colocatedShard)
{
bool includeSequenceDefaults = false;
List *ddlCommandList = GetTableDDLEvents(distributedRelationId,
@ -602,125 +573,6 @@ CreateShardsOnWorkersViaExecutor(Oid distributedRelationId, List *shardPlacement
}
/*
* CreateShardsOnWorkersViaCommands creates shards on worker nodes given the shard
* placements as a parameter. Function opens connections in transactional way. If the
* caller needs an exclusive connection (in case of distributing local table with data
* on it) or creating shards in a transaction, per placement connection is opened
* for each placement.
*/
static void
CreateShardsOnWorkersViaCommands(Oid distributedRelationId, List *shardPlacements,
bool useExclusiveConnection, bool colocatedShard)
{
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedRelationId);
bool includeSequenceDefaults = false;
List *ddlCommandList = GetTableDDLEvents(distributedRelationId,
includeSequenceDefaults);
List *foreignConstraintCommandList = GetTableForeignConstraintCommands(
distributedRelationId);
List *claimedConnectionList = NIL;
ListCell *connectionCell = NULL;
ListCell *shardPlacementCell = NULL;
int connectionFlags = FOR_DDL;
bool partitionTable = PartitionTable(distributedRelationId);
if (useExclusiveConnection)
{
connectionFlags |= CONNECTION_PER_PLACEMENT;
}
BeginOrContinueCoordinatedTransaction();
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC ||
cacheEntry->replicationModel == REPLICATION_MODEL_2PC)
{
CoordinatedTransactionUse2PC();
}
/* mark parallel relation accesses before opening connections */
if (ShouldRecordRelationAccess() && useExclusiveConnection)
{
RecordParallelDDLAccess(distributedRelationId);
/* we should mark the parent as well */
if (partitionTable)
{
Oid parentRelationId = PartitionParentOid(distributedRelationId);
RecordParallelDDLAccess(parentRelationId);
}
}
foreach(shardPlacementCell, shardPlacements)
{
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell);
uint64 shardId = shardPlacement->shardId;
ShardInterval *shardInterval = LoadShardInterval(shardId);
List *relationShardList = NIL;
MultiConnection *connection = NULL;
int shardIndex = -1;
List *commandList = NIL;
if (colocatedShard)
{
shardIndex = ShardIndex(shardInterval);
}
/*
* For partitions, make sure that we mark the parent table relation access
* with DDL. This is only important for parallel relation access in transaction
* blocks, thus check useExclusiveConnection and transaction block as well.
*/
if (ShouldRecordRelationAccess() && useExclusiveConnection && partitionTable)
{
List *placementAccessList = NIL;
relationShardList = RelationShardListForShardCreate(shardInterval);
placementAccessList = BuildPlacementDDLList(shardPlacement->groupId,
relationShardList);
connection = GetPlacementListConnection(connectionFlags, placementAccessList,
NULL);
}
else
{
connection = GetPlacementConnection(connectionFlags, shardPlacement,
NULL);
}
if (useExclusiveConnection)
{
ClaimConnectionExclusively(connection);
claimedConnectionList = lappend(claimedConnectionList, connection);
}
RemoteTransactionBeginIfNecessary(connection);
MarkRemoteTransactionCritical(connection);
commandList = WorkerCreateShardCommandList(distributedRelationId, shardIndex,
shardId,
ddlCommandList,
foreignConstraintCommandList);
ExecuteCriticalRemoteCommandList(connection, commandList);
}
/*
* We need to unclaim all connections to make them usable again for the copy
* command, otherwise copy going to open new connections to placements and
* can not see uncommitted changes.
*/
foreach(connectionCell, claimedConnectionList)
{
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
UnclaimConnection(connection);
}
}
/*
* RelationShardListForShardCreate gets a shard interval and returns the placement
* accesses that would happen when a placement of the shard interval is created.

View File

@ -18,8 +18,8 @@
#include "distributed/commands/utility_hook.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_router_executor.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/resource_lock.h"
#include "utils/builtins.h"

View File

@ -1040,7 +1040,7 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan)
/*
* FinalizeNonRouterPlan gets the distributed custom scan plan, and creates the
* final master select plan on the top of this distributed plan for real-time
* final master select plan on the top of this distributed plan for adaptive
* and task-tracker executors.
*/
static PlannedStmt *

View File

@ -18,7 +18,7 @@
* These queries can use nearly all SQL features, but only if they have
* a single-valued filter on the distribution column.
*
* - Real-time queries that can be executed by performing a task for each
* - Multi-shard queries that can be executed by performing a task for each
* shard in a distributed table and performing a merge step.
*
* These queries have limited SQL support. They may only include

View File

@ -32,11 +32,11 @@
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_explain.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_logical_optimizer.h"
#include "distributed/distributed_planner.h"
#include "distributed/multi_router_executor.h"
#include "distributed/multi_router_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/pg_dist_partition.h"
@ -68,8 +68,10 @@
/* marks shared object as one loadable by the postgres version compiled against */
PG_MODULE_MAGIC;
#define DUMMY_REAL_TIME_EXECUTOR_ENUM_VALUE 9999999
static char *CitusVersion = CITUS_VERSION;
void _PG_init(void);
static void ResizeStackToMaximumDepth(void);
@ -78,6 +80,7 @@ static void CreateRequiredDirectories(void);
static void RegisterCitusConfigVariables(void);
static bool ErrorIfNotASuitableDeadlockFactor(double *newval, void **extra,
GucSource source);
static bool WarnIfDeprecatedExecutorUsed(int *newval, void **extra, GucSource source);
static void NormalizeWorkerListPath(void);
static bool NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source);
static void NodeConninfoGucAssignHook(const char *newval, void *extra);
@ -113,7 +116,7 @@ static const struct config_enum_entry replication_model_options[] = {
static const struct config_enum_entry task_executor_type_options[] = {
{ "adaptive", MULTI_EXECUTOR_ADAPTIVE, false },
{ "real-time", MULTI_EXECUTOR_ADAPTIVE, false }, /* ignore real-time executor, always use adaptive */
{ "real-time", DUMMY_REAL_TIME_EXECUTOR_ENUM_VALUE, false }, /* keep it for backward comp. */
{ "task-tracker", MULTI_EXECUTOR_TASK_TRACKER, false },
{ NULL, 0, false }
};
@ -1055,7 +1058,7 @@ RegisterCitusConfigVariables(void)
"citus.task_executor_type",
gettext_noop("Sets the executor type to be used for distributed queries."),
gettext_noop("The master node chooses between two different executor types "
"when executing a distributed query.The real-time executor is "
"when executing a distributed query.The adaptive executor is "
"optimal for simple key-value lookup queries and queries that "
"involve aggregations and/or co-located joins on multiple shards. "
"The task-tracker executor is optimal for long-running, complex "
@ -1066,7 +1069,7 @@ RegisterCitusConfigVariables(void)
task_executor_type_options,
PGC_USERSET,
GUC_STANDARD,
NULL, NULL, NULL);
WarnIfDeprecatedExecutorUsed, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_repartition_joins",
@ -1302,6 +1305,27 @@ ErrorIfNotASuitableDeadlockFactor(double *newval, void **extra, GucSource source
}
/*
* WarnIfDeprecatedExecutorUsed prints a warning and sets the config value to
* adaptive executor (a.k.a., ignores real-time executor).
*/
static bool
WarnIfDeprecatedExecutorUsed(int *newval, void **extra, GucSource source)
{
if (*newval == DUMMY_REAL_TIME_EXECUTOR_ENUM_VALUE)
{
ereport(WARNING, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("Ignoring the setting, real-time executor is "
"deprecated")));
/* adaptive executor is superset of real-time, so switch to that */
*newval = MULTI_EXECUTOR_ADAPTIVE;
}
return true;
}
/*
* NormalizeWorkerListPath converts the path configured via
* citus.worker_list_file into an absolute path, falling back to the default

View File

@ -68,8 +68,8 @@
*
* In other words, the following types of queries won't be observed in these
* views:
* - Router queries that are not inside transaction blocks
* - Real-time queries that are not inside transaction blocks
* - Single-shard queries that are not inside transaction blocks
* - Multi-shard select queries that are not inside transaction blocks
* - Task-tracker queries
*
*

View File

@ -1,285 +0,0 @@
/*-------------------------------------------------------------------------
*
* multi_shard_transaction.c
* This file contains functions for managing 1PC or 2PC transactions
* across many shard placements.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "libpq-fe.h"
#include "postgres.h"
#include "distributed/colocation_utils.h"
#include "distributed/connection_management.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_router_executor.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/placement_connection.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/worker_manager.h"
#include "nodes/pg_list.h"
#include "storage/ipc.h"
#include "utils/memutils.h"
#define INITIAL_SHARD_CONNECTION_HASH_SIZE 128
/*
* OpenTransactionsForAllTasks opens a connection for each task,
* taking into account which shards are read and modified by the task
* to select the appopriate connection, or error out if no appropriate
* connection can be found. The set of connections is returned as an
* anchor shard ID -> ShardConnections hash.
*/
HTAB *
OpenTransactionsForAllTasks(List *taskList, int connectionFlags)
{
HTAB *shardConnectionHash = NULL;
ListCell *taskCell = NULL;
List *newConnectionList = NIL;
shardConnectionHash = CreateShardConnectionHash(CurrentMemoryContext);
connectionFlags |= CONNECTION_PER_PLACEMENT;
/* open connections to shards which don't have connections yet */
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
ShardPlacementAccessType accessType = PLACEMENT_ACCESS_SELECT;
uint64 shardId = task->anchorShardId;
ShardConnections *shardConnections = NULL;
bool shardConnectionsFound = false;
List *shardPlacementList = NIL;
ListCell *placementCell = NULL;
shardConnections = GetShardHashConnections(shardConnectionHash, shardId,
&shardConnectionsFound);
if (shardConnectionsFound)
{
continue;
}
shardPlacementList = FinalizedShardPlacementList(shardId);
if (shardPlacementList == NIL)
{
/* going to have to have some placements to do any work */
ereport(ERROR, (errmsg("could not find any shard placements for the shard "
UINT64_FORMAT, shardId)));
}
if (task->taskType == MODIFY_TASK)
{
accessType = PLACEMENT_ACCESS_DML;
}
else
{
/* can only open connections for DDL and DML commands */
Assert(task->taskType == DDL_TASK || VACUUM_ANALYZE_TASK);
accessType = PLACEMENT_ACCESS_DDL;
}
foreach(placementCell, shardPlacementList)
{
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(placementCell);
ShardPlacementAccess placementModification;
List *placementAccessList = NIL;
MultiConnection *connection = NULL;
WorkerNode *workerNode = FindWorkerNode(shardPlacement->nodeName,
shardPlacement->nodePort);
if (workerNode == NULL)
{
ereport(ERROR, (errmsg("could not find worker node %s:%d",
shardPlacement->nodeName,
shardPlacement->nodePort)));
}
/* add placement access for modification */
placementModification.placement = shardPlacement;
placementModification.accessType = accessType;
placementAccessList = lappend(placementAccessList, &placementModification);
if (accessType == PLACEMENT_ACCESS_DDL)
{
List *placementDDLList = BuildPlacementDDLList(shardPlacement->groupId,
task->relationShardList);
/*
* All relations appearing inter-shard DDL commands should be marked
* with DDL access.
*/
placementAccessList = list_concat(placementAccessList, placementDDLList);
}
else
{
List *placementSelectList =
BuildPlacementSelectList(shardPlacement->groupId,
task->relationShardList);
/* add additional placement accesses for subselects (e.g. INSERT .. SELECT) */
placementAccessList =
list_concat(placementAccessList, placementSelectList);
}
/*
* Find a connection that sees preceding writes and cannot self-deadlock,
* or error out if no such connection exists.
*/
connection = StartPlacementListConnection(connectionFlags,
placementAccessList, NULL);
ClaimConnectionExclusively(connection);
shardConnections->connectionList = lappend(shardConnections->connectionList,
connection);
newConnectionList = lappend(newConnectionList, connection);
/*
* Every individual failure should cause entire distributed
* transaction to fail.
*/
MarkRemoteTransactionCritical(connection);
}
}
/* finish connection establishment newly opened connections */
FinishConnectionListEstablishment(newConnectionList);
/* the special BARE mode (for e.g. VACUUM/ANALYZE) skips BEGIN */
if (MultiShardCommitProtocol > COMMIT_PROTOCOL_BARE)
{
RemoteTransactionsBeginIfNecessary(newConnectionList);
}
return shardConnectionHash;
}
/*
* CreateShardConnectionHash constructs a hash table which maps from shard
* identifier to connection lists, passing the provided MemoryContext to
* hash_create for hash allocations.
*/
HTAB *
CreateShardConnectionHash(MemoryContext memoryContext)
{
HTAB *shardConnectionsHash = NULL;
int hashFlags = 0;
HASHCTL info;
memset(&info, 0, sizeof(info));
info.keysize = sizeof(int64);
info.entrysize = sizeof(ShardConnections);
info.hcxt = memoryContext;
hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
shardConnectionsHash = hash_create("Shard Connections Hash",
INITIAL_SHARD_CONNECTION_HASH_SIZE, &info,
hashFlags);
return shardConnectionsHash;
}
/*
* GetShardHashConnections finds existing connections for a shard in the
* provided hash. If not found, then a ShardConnections structure with empty
* connectionList is returned.
*/
ShardConnections *
GetShardHashConnections(HTAB *connectionHash, int64 shardId, bool *connectionsFound)
{
ShardConnections *shardConnections = NULL;
shardConnections = (ShardConnections *) hash_search(connectionHash, &shardId,
HASH_ENTER, connectionsFound);
if (!*connectionsFound)
{
shardConnections->shardId = shardId;
shardConnections->connectionList = NIL;
}
return shardConnections;
}
/*
* ShardConnectionList returns the list of ShardConnections in connectionHash.
*/
List *
ShardConnectionList(HTAB *connectionHash)
{
List *shardConnectionsList = NIL;
HASH_SEQ_STATUS status;
ShardConnections *shardConnections = NULL;
if (connectionHash == NULL)
{
return NIL;
}
hash_seq_init(&status, connectionHash);
shardConnections = (ShardConnections *) hash_seq_search(&status);
while (shardConnections != NULL)
{
shardConnectionsList = lappend(shardConnectionsList, shardConnections);
shardConnections = (ShardConnections *) hash_seq_search(&status);
}
return shardConnectionsList;
}
/*
* ResetShardPlacementTransactionState performs cleanup after the end of a
* transaction.
*/
void
ResetShardPlacementTransactionState(void)
{
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_BARE)
{
MultiShardCommitProtocol = SavedMultiShardCommitProtocol;
SavedMultiShardCommitProtocol = COMMIT_PROTOCOL_BARE;
}
}
/*
* UnclaimAllShardConnections unclaims all connections in the given
* shard connections hash after previously claiming them exclusively
* in OpenTransactionsToAllShardPlacements.
*/
void
UnclaimAllShardConnections(HTAB *shardConnectionHash)
{
HASH_SEQ_STATUS status;
ShardConnections *shardConnections = NULL;
hash_seq_init(&status, shardConnectionHash);
while ((shardConnections = hash_seq_search(&status)) != 0)
{
List *connectionList = shardConnections->connectionList;
ListCell *connectionCell = NULL;
foreach(connectionCell, connectionList)
{
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
UnclaimConnection(connection);
}
}
}

View File

@ -25,7 +25,6 @@
#include "distributed/intermediate_results.h"
#include "distributed/local_executor.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/transaction_management.h"
#include "distributed/placement_connection.h"
#include "distributed/subplan_execution.h"
@ -43,6 +42,24 @@ int MultiShardCommitProtocol = COMMIT_PROTOCOL_2PC;
int SingleShardCommitProtocol = COMMIT_PROTOCOL_2PC;
int SavedMultiShardCommitProtocol = COMMIT_PROTOCOL_BARE;
/*
* GUC that determines whether a SELECT in a transaction block should also run in
* a transaction block on the worker even if no writes have occurred yet.
*/
bool SelectOpensTransactionBlock = true;
/* controls use of locks to enforce safe commutativity */
bool AllModificationsCommutative = false;
/* we've deprecated this flag, keeping here for some time not to break existing users */
bool EnableDeadlockPrevention = true;
/* number of nested stored procedure call levels we are currently in */
int StoredProcedureLevel = 0;
/* number of nested DO block levels we are currently in */
int DoBlockLevel = 0;
/* state needed to keep track of operations used during a transaction */
XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE;
@ -88,6 +105,7 @@ static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransaction
SubTransactionId parentSubid, void *arg);
/* remaining functions */
static void ResetShardPlacementTransactionState(void);
static void AdjustMaxPreparedTransactions(void);
static void PushSubXact(SubTransactionId subId);
static void PopSubXact(SubTransactionId subId);
@ -406,6 +424,21 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
}
/*
* ResetShardPlacementTransactionState performs cleanup after the end of a
* transaction.
*/
static void
ResetShardPlacementTransactionState(void)
{
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_BARE)
{
MultiShardCommitProtocol = SavedMultiShardCommitProtocol;
SavedMultiShardCommitProtocol = COMMIT_PROTOCOL_BARE;
}
}
/*
* Subtransaction callback - currently only used to remember whether a
* savepoint has been rolled back, as we don't support that.

View File

@ -22,7 +22,6 @@
#include "distributed/connection_management.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/resource_lock.h"
#include "distributed/remote_commands.h"
#include "distributed/pg_dist_node.h"

View File

@ -279,11 +279,9 @@ CopyNodeTaskExecution(COPYFUNC_ARGS)
COPY_SCALAR_ARRAY(connectionIdArray, int32, from->nodeCount);
COPY_SCALAR_ARRAY(fileDescriptorArray, int32, from->nodeCount);
COPY_SCALAR_FIELD(connectStartTime);
COPY_SCALAR_FIELD(currentNodeIndex);
COPY_SCALAR_FIELD(querySourceNodeIndex);
COPY_SCALAR_FIELD(failureCount);
COPY_SCALAR_FIELD(criticalErrorOccurred);
}

View File

@ -494,11 +494,9 @@ OutTaskExecution(OUTFUNC_ARGS)
WRITE_INT_ARRAY(connectionIdArray, node->nodeCount);
WRITE_INT_ARRAY(fileDescriptorArray, node->nodeCount);
WRITE_INT64_FIELD(connectStartTime);
WRITE_UINT_FIELD(currentNodeIndex);
WRITE_UINT_FIELD(querySourceNodeIndex);
WRITE_UINT_FIELD(failureCount);
WRITE_BOOL_FIELD(criticalErrorOccurred);
}

View File

@ -26,9 +26,9 @@
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/distributed_planner.h"
#include "distributed/multi_router_executor.h"
#include "distributed/relay_utility.h"
#include "distributed/reference_table_utils.h"
#include "distributed/remote_commands.h"
@ -694,50 +694,6 @@ LockParentShardResourceIfPartition(uint64 shardId, LOCKMODE lockMode)
}
/*
* LockPartitionsInRelationList iterates over given list and acquires locks on
* partitions of each partitioned table. It does nothing for non-partitioned tables.
*/
void
LockPartitionsInRelationList(List *relationIdList, LOCKMODE lockmode)
{
ListCell *relationIdCell = NULL;
foreach(relationIdCell, relationIdList)
{
Oid relationId = lfirst_oid(relationIdCell);
if (PartitionedTable(relationId))
{
LockPartitionRelations(relationId, lockmode);
}
}
}
/*
* LockPartitionRelations acquires relation lock on all partitions of given
* partitioned relation. This function expects that given relation is a
* partitioned relation.
*/
void
LockPartitionRelations(Oid relationId, LOCKMODE lockMode)
{
/*
* PartitionList function generates partition list in the same order
* as PostgreSQL. Therefore we do not need to sort it before acquiring
* locks.
*/
List *partitionList = PartitionList(relationId);
ListCell *partitionCell = NULL;
foreach(partitionCell, partitionList)
{
Oid partitionRelationId = lfirst_oid(partitionCell);
LockRelationOid(partitionRelationId, lockMode);
}
}
/*
* LockModeTextToLockMode gets a lockMode name and returns its corresponding LOCKMODE.
* The function errors out if the input lock mode isn't defined in the PostgreSQL's

View File

@ -574,86 +574,6 @@ TableDDLCommandList(const char *nodeName, uint32 nodePort, const char *tableName
}
/*
* ExecuteRemoteQuery executes the given query, copies the query's results to a
* sorted list, and returns this list. The function assumes that query results
* have a single column, and asserts on that assumption. If results are empty,
* or an error occurs during query runtime, the function returns an empty list.
* If asUser is NULL the connection is established as the current user,
* otherwise as the specified user.
*/
List *
ExecuteRemoteQuery(const char *nodeName, uint32 nodePort, char *runAsUser,
StringInfo queryString)
{
int32 connectionId = -1;
bool querySent = false;
bool queryReady = false;
bool queryOK = false;
void *queryResult = NULL;
int rowCount = 0;
int rowIndex = 0;
int columnCount = 0;
List *resultList = NIL;
connectionId = MultiClientConnect(nodeName, nodePort, NULL, runAsUser);
if (connectionId == INVALID_CONNECTION_ID)
{
return NIL;
}
querySent = MultiClientSendQuery(connectionId, queryString->data);
if (!querySent)
{
MultiClientDisconnect(connectionId);
return NIL;
}
while (!queryReady)
{
ResultStatus resultStatus = MultiClientResultStatus(connectionId);
if (resultStatus == CLIENT_RESULT_READY)
{
queryReady = true;
}
else if (resultStatus == CLIENT_RESULT_BUSY)
{
long sleepIntervalPerCycle = RemoteTaskCheckInterval * 1000L;
pg_usleep(sleepIntervalPerCycle);
}
else
{
MultiClientDisconnect(connectionId);
return NIL;
}
}
queryOK = MultiClientQueryResult(connectionId, &queryResult, &rowCount, &columnCount);
if (!queryOK)
{
MultiClientDisconnect(connectionId);
return NIL;
}
for (rowIndex = 0; rowIndex < rowCount; rowIndex++)
{
const int columnIndex = 0;
char *rowValue = MultiClientGetValue(queryResult, rowIndex, columnIndex);
StringInfo rowValueString = makeStringInfo();
appendStringInfoString(rowValueString, rowValue);
Assert(columnCount == 1);
resultList = lappend(resultList, rowValueString);
}
MultiClientClearResult(queryResult);
MultiClientDisconnect(connectionId);
return resultList;
}
/*
* Parses the given DDL command, and returns the tree node for parsed command.
*/

View File

@ -28,9 +28,7 @@ typedef struct CitusScanState
/* custom scan methods for all executors */
extern CustomScanMethods AdaptiveExecutorCustomScanMethods;
extern CustomScanMethods RealTimeCustomScanMethods;
extern CustomScanMethods TaskTrackerCustomScanMethods;
extern CustomScanMethods RouterCustomScanMethods;
extern CustomScanMethods CoordinatorInsertSelectCustomScanMethods;
extern CustomScanMethods DelayedErrorCustomScanMethods;

View File

@ -205,7 +205,6 @@ extern long DeadlineTimestampTzToTimeout(TimestampTz deadline);
/* dealing with notice handler */
extern void SetCitusNoticeProcessor(MultiConnection *connection);
extern void SetCitusNoticeLevel(int level);
extern char * TrimLogLevel(const char *message);
extern void UnsetCitusNoticeLevel(void);

View File

@ -0,0 +1,25 @@
/*-------------------------------------------------------------------------
*
* distributed_execution_locks.h
* Locking Infrastructure for distributed execution.
*
* Copyright (c) Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#ifndef DISTRIBUTED_EXECUTION_LOCKS_H
#define DISTRIBUTED_EXECUTION_LOCKS_H
#include "postgres.h"
#include "nodes/pg_list.h"
#include "storage/lockdefs.h"
#include "distributed/multi_physical_planner.h"
extern void AcquireExecutorShardLocks(Task *task, RowModifyLevel modLevel);
extern void AcquireExecutorMultiShardLocks(List *taskList);
extern void AcquireMetadataLocks(List *taskList);
extern void LockPartitionsInRelationList(List *relationIdList, LOCKMODE lockmode);
extern void LockPartitionRelations(Oid relationId, LOCKMODE lockMode);
#endif /* DISTRIBUTED_EXECUTION_LOCKS_H */

View File

@ -114,35 +114,19 @@ extern int32 MultiClientConnect(const char *nodeName, uint32 nodePort,
const char *nodeDatabase, const char *nodeUser);
extern int32 MultiClientConnectStart(const char *nodeName, uint32 nodePort,
const char *nodeDatabase, const char *nodeUser);
extern int32 MultiClientPlacementConnectStart(List *placementAccessList,
const char *userName);
extern ConnectStatus MultiClientConnectPoll(int32 connectionId);
extern MultiConnection * MultiClientGetConnection(int32 connectionId);
extern void MultiClientDisconnect(int32 connectionId);
extern void MultiClientReleaseConnection(int32 connectionId);
extern bool MultiClientConnectionUp(int32 connectionId);
extern bool MultiClientExecute(int32 connectionId, const char *query, void **queryResult,
int *rowCount, int *columnCount);
extern bool MultiClientSendQuery(int32 connectionId, const char *query);
extern bool MultiClientCancel(int32 connectionId);
extern ResultStatus MultiClientResultStatus(int32 connectionId);
extern QueryStatus MultiClientQueryStatus(int32 connectionId);
extern CopyStatus MultiClientCopyData(int32 connectionId, int32 fileDescriptor,
uint64 *returnBytesReceived);
extern bool MultiClientQueryResult(int32 connectionId, void **queryResult,
int *rowCount, int *columnCount);
extern BatchQueryStatus MultiClientBatchResult(int32 connectionId, void **queryResult,
int *rowCount, int *columnCount);
extern char * MultiClientGetValue(void *queryResult, int rowIndex, int columnIndex);
extern bool MultiClientValueIsNull(void *queryResult, int rowIndex, int columnIndex);
extern void MultiClientClearResult(void *queryResult);
extern WaitInfo * MultiClientCreateWaitInfo(int maxConnections);
extern void MultiClientResetWaitInfo(WaitInfo *waitInfo);
extern void MultiClientFreeWaitInfo(WaitInfo *waitInfo);
extern void MultiClientRegisterWait(WaitInfo *waitInfo, TaskExecutionStatus waitStatus,
int32 connectionId);
extern void MultiClientWait(WaitInfo *waitInfo);
#endif /* MULTI_CLIENT_EXECUTOR_H */

View File

@ -25,13 +25,13 @@ typedef enum
PARALLEL_CONNECTION = 0,
SEQUENTIAL_CONNECTION = 1
} MultiShardConnectionTypes;
extern int MultiShardConnectionType;
extern bool WritableStandbyCoordinator;
extern bool ForceMaxQueryParallelization;
extern int MaxAdaptiveExecutorPoolSize;
extern int ExecutorSlowStartInterval;
extern bool SortReturning;
extern void CitusExecutorStart(QueryDesc *queryDesc, int eflags);
@ -62,6 +62,10 @@ extern void SetLocalMultiShardModifyModeToSequential(void);
extern void SetLocalForceMaxQueryParallelization(void);
extern void SortTupleStore(CitusScanState *scanState);
extern bool DistributedPlanModifiesDatabase(DistributedPlan *plan);
extern void ExtractParametersFromParamList(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues, bool
useOriginalCustomTypeOids);
#endif /* MULTI_EXECUTOR_H */

View File

@ -1,73 +0,0 @@
/*
* multi_router_executor.h
*
* Function declarations used in executing distributed execution
* plan.
*
*/
#ifndef MULTI_ROUTER_EXECUTOR_H_
#define MULTI_ROUTER_EXECUTOR_H_
#include "c.h"
#include "access/sdir.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/placement_connection.h"
#include "executor/execdesc.h"
#include "executor/tuptable.h"
#include "nodes/pg_list.h"
/*
* XactShardConnSet keeps track of the mapping from shard to the set of nodes
* involved in multi-statement transaction-wrapped modifications of that shard.
* This information is used to mark placements inactive at transaction close.
*/
typedef struct XactShardConnSet
{
uint64 shardId; /* identifier of the shard that was modified */
List *connectionEntryList; /* NodeConnectionEntry pointers to participating nodes */
} XactShardConnSet;
/* Config variables managed via guc.c */
extern bool AllModificationsCommutative;
extern bool EnableDeadlockPrevention;
extern bool SortReturning;
extern void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags);
extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node);
extern TupleTableSlot * RouterModifyExecScan(CustomScanState *node);
extern void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
bool isModificationQuery, bool expectResults);
int64 ExecuteModifyTasksSequentially(CitusScanState *scanState, List *taskList,
RowModifyLevel modLevel, bool hasReturning);
extern int64 ExecuteModifyTasksWithoutResults(List *taskList);
extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList,
RowModifyLevel modLevel);
extern ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
ShardPlacementAccessType accessType);
/* helper functions */
extern void AcquireExecutorShardLocks(Task *task, RowModifyLevel modLevel);
extern void AcquireExecutorMultiShardLocks(List *taskList);
extern ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
ShardPlacementAccessType accessType);
extern bool TaskListRequires2PC(List *taskList);
extern bool ReadOnlyTask(TaskType taskType);
extern List * BuildPlacementSelectList(int32 groupId, List *relationShardList);
extern List * BuildPlacementDDLList(int32 groupId, List *relationShardList);
extern void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues);
extern void ExtractParametersFromParamList(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues, bool
useOriginalCustomTypeOids);
#endif /* MULTI_ROUTER_EXECUTOR_H_ */

View File

@ -24,8 +24,6 @@
#define RESERVED_FD_COUNT 64 /* file descriptors unavailable to executor */
/* copy out query results */
#define COPY_QUERY_TO_STDOUT_TEXT "COPY (%s) TO STDOUT"
#define COPY_QUERY_TO_STDOUT_BINARY "COPY (%s) TO STDOUT WITH (FORMAT binary)"
#define EXECUTE_SQL_TASK_TO_FILE_BINARY \
"SELECT worker_execute_sql_task("UINT64_FORMAT ", %u, %s, true)"
#define EXECUTE_SQL_TASK_TO_FILE_TEXT \
@ -42,26 +40,15 @@
/* Enumeration to track one task's execution status */
typedef enum
{
EXEC_TASK_INVALID_FIRST = 0,
EXEC_TASK_CONNECT_START = 1,
EXEC_TASK_CONNECT_POLL = 2,
EXEC_TASK_FAILED = 3,
EXEC_COMPUTE_TASK_START = 4,
EXEC_COMPUTE_TASK_RUNNING = 5,
EXEC_COMPUTE_TASK_COPYING = 6,
EXEC_TASK_DONE = 7,
/* used for task tracker executor */
EXEC_TASK_UNASSIGNED = 11,
EXEC_TASK_QUEUED = 12,
EXEC_TASK_TRACKER_RETRY = 13,
EXEC_TASK_TRACKER_FAILED = 14,
EXEC_SOURCE_TASK_TRACKER_RETRY = 15,
EXEC_SOURCE_TASK_TRACKER_FAILED = 16,
/* transactional operations */
EXEC_BEGIN_START = 20,
EXEC_BEGIN_RUNNING = 21
EXEC_TASK_INVALID_FIRST = 0,
EXEC_TASK_DONE = 1,
EXEC_TASK_UNASSIGNED = 2,
EXEC_TASK_QUEUED = 3,
EXEC_TASK_TRACKER_RETRY = 4,
EXEC_TASK_TRACKER_FAILED = 5,
EXEC_SOURCE_TASK_TRACKER_RETRY = 6,
EXEC_SOURCE_TASK_TRACKER_FAILED = 7,
} TaskExecStatus;
@ -94,22 +81,11 @@ typedef enum
{
MULTI_EXECUTOR_INVALID_FIRST = 0,
MULTI_EXECUTOR_ADAPTIVE = 1,
MULTI_EXECUTOR_REAL_TIME = 2,
MULTI_EXECUTOR_TASK_TRACKER = 3,
MULTI_EXECUTOR_ROUTER = 4,
MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT = 5
MULTI_EXECUTOR_TASK_TRACKER = 2,
MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT = 3
} MultiExecutorType;
/* Enumeration that represents a (dis)connect action taken */
typedef enum
{
CONNECT_ACTION_NONE = 0,
CONNECT_ACTION_OPENED = 1,
CONNECT_ACTION_CLOSED = 2
} ConnectAction;
/*
* DistributedExecutionStats holds the execution related stats.
*
@ -124,10 +100,8 @@ typedef struct DistributedExecutionStats
/*
* TaskExecution holds state that relates to a task's execution. In the case of
* the real-time executor, this struct encapsulates all information necessary to
* run the task. The task tracker executor however manages its connection logic
* elsewhere, and doesn't use connection related fields defined in here.
* TaskExecution holds state that relates to a task's execution for task-tracker
* executor.
*/
struct TaskExecution
{
@ -139,12 +113,10 @@ struct TaskExecution
TransmitExecStatus *transmitStatusArray;
int32 *connectionIdArray;
int32 *fileDescriptorArray;
TimestampTz connectStartTime;
uint32 nodeCount;
uint32 currentNodeIndex;
uint32 querySourceNodeIndex; /* only applies to map fetch tasks */
uint32 failureCount;
bool criticalErrorOccurred;
};
@ -187,18 +159,6 @@ typedef struct TaskTracker
} TaskTracker;
/*
* WorkerNodeState keeps state for a worker node. The real-time executor uses this to
* keep track of the number of open connections to a worker node.
*/
typedef struct WorkerNodeState
{
uint32 workerPort;
char workerName[WORKER_LENGTH];
uint32 openConnectionCount;
} WorkerNodeState;
/* Config variable managed via guc.c */
extern int RemoteTaskCheckInterval;
extern int MaxAssignTaskBatchSize;
@ -209,7 +169,6 @@ extern int MultiTaskQueryLogLevel;
/* Function declarations for distributed execution */
extern void MultiRealTimeExecute(Job *job);
extern void MultiTaskTrackerExecute(Job *job);
/* Function declarations common to more than one executor */
@ -222,10 +181,8 @@ extern void ErrorSizeLimitIsExceeded(void);
extern bool TaskExecutionFailed(TaskExecution *taskExecution);
extern void AdjustStateForFailure(TaskExecution *taskExecution);
extern int MaxMasterConnectionCount(void);
extern void PrepareMasterJobDirectory(Job *workerJob);
extern TupleTableSlot * RealTimeExecScan(CustomScanState *node);
extern TupleTableSlot * TaskTrackerExecScan(CustomScanState *node);
#endif /* MULTI_SERVER_EXECUTOR_H */

View File

@ -1,39 +0,0 @@
/*-------------------------------------------------------------------------
*
* multi_shard_transaction.h
* Type and function declarations used in performing transactions across
* shard placements.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef MULTI_SHARD_TRANSACTION_H
#define MULTI_SHARD_TRANSACTION_H
#include "utils/hsearch.h"
#include "nodes/pg_list.h"
/* ShardConnections represents a set of connections for each placement of a shard */
typedef struct ShardConnections
{
int64 shardId;
/* list of MultiConnection structs */
List *connectionList;
} ShardConnections;
extern HTAB * OpenTransactionsForAllTasks(List *taskList, int connectionFlags);
extern HTAB * CreateShardConnectionHash(MemoryContext memoryContext);
extern ShardConnections * GetShardHashConnections(HTAB *connectionHash, int64 shardId,
bool *connectionsFound);
extern List * ShardConnectionList(HTAB *connectionHash);
extern void ResetShardPlacementTransactionState(void);
extern void UnclaimAllShardConnections(HTAB *shardConnectionHash);
#endif /* MULTI_SHARD_TRANSACTION_H */

View File

@ -0,0 +1,48 @@
/*-------------------------------------------------------------------------
*
* placement_access.h
* Declarations of the structs and functions used in generating the
* placement accesses for distributed query execution.
* Copyright (c) Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#ifndef PLACEMENT_ACCESS_H
#define PLACEMENT_ACCESS_H
#include "postgres.h"
#include "nodes/pg_list.h"
#include "distributed/multi_physical_planner.h"
/* forward declare, to avoid dependency on ShardPlacement definition */
struct ShardPlacement;
/* represents the way in which a placement is accessed */
typedef enum ShardPlacementAccessType
{
/* read from placement */
PLACEMENT_ACCESS_SELECT,
/* modify rows in placement */
PLACEMENT_ACCESS_DML,
/* modify placement schema */
PLACEMENT_ACCESS_DDL
} ShardPlacementAccessType;
/* represents access to a placement */
typedef struct ShardPlacementAccess
{
/* placement that is accessed */
struct ShardPlacement *placement;
/* the way in which the placement is accessed */
ShardPlacementAccessType accessType;
} ShardPlacementAccess;
extern List * PlacementAccessListForTask(Task *task, ShardPlacement *taskPlacement);
extern ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
ShardPlacementAccessType accessType);
#endif /* PLACEMENT_ACCESS_H */

View File

@ -11,32 +11,7 @@
#include "distributed/connection_management.h"
/* forward declare, to avoid dependency on ShardPlacement definition */
struct ShardPlacement;
/* represents the way in which a placement is accessed */
typedef enum ShardPlacementAccessType
{
/* read from placement */
PLACEMENT_ACCESS_SELECT,
/* modify rows in placement */
PLACEMENT_ACCESS_DML,
/* modify placement schema */
PLACEMENT_ACCESS_DDL
} ShardPlacementAccessType;
/* represents access to a placement */
typedef struct ShardPlacementAccess
{
/* placement that is accessed */
struct ShardPlacement *placement;
/* the way in which the placement is accessed */
ShardPlacementAccessType accessType;
} ShardPlacementAccess;
#include "distributed/placement_access.h"
extern MultiConnection * GetPlacementConnection(uint32 flags,
@ -48,9 +23,6 @@ extern MultiConnection * StartPlacementConnection(uint32 flags,
extern MultiConnection * GetConnectionIfPlacementAccessedInXact(int flags,
List *placementAccessList,
const char *userName);
extern MultiConnection * GetPlacementListConnection(uint32 flags,
List *placementAccessList,
const char *userName);
extern MultiConnection * StartPlacementListConnection(uint32 flags,
List *placementAccessList,
const char *userName);

View File

@ -101,10 +101,6 @@ extern void SerializeNonCommutativeWrites(List *shardIntervalList, LOCKMODE lock
extern void LockRelationShardResources(List *relationShardList, LOCKMODE lockMode);
extern List * GetSortedReferenceShardIntervals(List *relationList);
/* Lock partitions of partitioned table */
extern void LockPartitionsInRelationList(List *relationIdList, LOCKMODE lockmode);
extern void LockPartitionRelations(Oid relationId, LOCKMODE lockMode);
/* Lock parent table's colocated shard resource */
extern void LockParentShardResourceIfPartition(uint64 shardId, LOCKMODE lockMode);

View File

@ -88,6 +88,12 @@ extern CoordinatedTransactionState CurrentCoordinatedTransactionState;
/* list of connections that are part of the current coordinated transaction */
extern dlist_head InProgressTransactions;
/* controls use of locks to enforce safe commutativity */
extern bool AllModificationsCommutative;
/* we've deprecated this flag, keeping here for some time not to break existing users */
extern bool EnableDeadlockPrevention;
/* number of nested stored procedure call levels we are currently in */
extern int StoredProcedureLevel;

View File

@ -134,8 +134,6 @@ extern int64 WorkerExecuteSqlTask(Query *query, char *taskFilename,
/* Function declarations shared with the master planner */
extern StringInfo TaskFilename(StringInfo directoryName, uint32 taskId);
extern StringInfo UserTaskFilename(StringInfo directoryName, uint32 taskId);
extern List * ExecuteRemoteQuery(const char *nodeName, uint32 nodePort, char *runAsUser,
StringInfo queryString);
extern List * ColumnDefinitionList(List *columnNameList, List *columnTypeList);
extern CreateStmt * CreateStatement(RangeVar *relation, List *columnDefinitionList);
extern CopyStmt * CopyStatement(RangeVar *relation, char *sourceFilename);

View File

@ -21,7 +21,7 @@ SELECT * FROM tab;
SET citus.task_executor_type TO 'task-tracker';
SELECT * FROM tab;
ERROR: task tracker queries are not allowed while citus.use_secondary_nodes is 'always'
HINT: try setting citus.task_executor_type TO 'real-time'
HINT: try setting citus.task_executor_type TO 'adaptive'
-- clean up
\c - - - :master_port
DROP TABLE tab;

View File

@ -371,8 +371,8 @@ WHERE
125
(1 row)
-- set task_executor back to real-time
SET citus.task_executor_type TO "real-time";
-- set task_executor back to adaptive
SET citus.task_executor_type TO "adaptive";
-- connect to the master and do some test
-- regarding DDL support on schemas where
-- the search_path is set

View File

@ -1097,7 +1097,7 @@ LOG: join order: [ "colocated_table_test" ][ reference join "reference_table_te
SET citus.log_multi_join_order TO FALSE;
SET citus.shard_count TO DEFAULT;
SET citus.task_executor_type to "real-time";
SET citus.task_executor_type to "adaptive";
-- some INSERT .. SELECT queries that involve both hash distributed and reference tables
-- should go via coordinator since we're inserting into reference table where
-- not all the participants are reference tables

View File

@ -987,8 +987,8 @@ WHERE
14
(1 row)
-- set task_executor back to real-time
SET citus.task_executor_type TO "real-time";
-- set task_executor back to adaptive
SET citus.task_executor_type TO "adaptive";
-- test ALTER TABLE SET SCHEMA
-- we expect that it will warn out
SET search_path TO public;
@ -1074,7 +1074,7 @@ SELECT sum(result::int) FROM run_command_on_shards('run_test_schema.test_table',
(1 row)
-- test capital letters on both table and schema names
SET citus.task_executor_type to "real-time";
SET citus.task_executor_type to "adaptive";
-- create schema with weird names
CREATE SCHEMA "CiTuS.TeeN";
CREATE SCHEMA "CiTUS.TEEN2";

View File

@ -411,7 +411,7 @@ SELECT o_orderstatus, sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders
-- now, test the cases where Citus do or do not need to create
-- the master queries
SET client_min_messages TO 'DEBUG2';
SET citus.task_executor_type TO 'real-time';
SET citus.task_executor_type TO 'adaptive';
-- start with the simple lookup query
SELECT *
FROM articles

View File

@ -355,7 +355,7 @@ SELECT o_orderstatus, sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders
-- now, test the cases where Citus do or do not need to create
-- the master queries
SET client_min_messages TO 'DEBUG2';
SET citus.task_executor_type TO 'real-time';
SET citus.task_executor_type TO 'adaptive';
-- start with the simple lookup query
SELECT *
FROM articles

View File

@ -217,8 +217,8 @@ FROM
WHERE
n1.n_regionkey = n2.n_regionkey;
-- set task_executor back to real-time
SET citus.task_executor_type TO "real-time";
-- set task_executor back to adaptive
SET citus.task_executor_type TO "adaptive";
-- connect to the master and do some test
-- regarding DDL support on schemas where

View File

@ -682,7 +682,7 @@ WHERE
SET citus.log_multi_join_order TO FALSE;
SET citus.shard_count TO DEFAULT;
SET citus.task_executor_type to "real-time";
SET citus.task_executor_type to "adaptive";
-- some INSERT .. SELECT queries that involve both hash distributed and reference tables

View File

@ -724,8 +724,8 @@ FROM
WHERE
n1.n_regionkey = n2.n_regionkey;
-- set task_executor back to real-time
SET citus.task_executor_type TO "real-time";
-- set task_executor back to adaptive
SET citus.task_executor_type TO "adaptive";
-- test ALTER TABLE SET SCHEMA
@ -769,7 +769,7 @@ SELECT sum(result::int) FROM run_command_on_placements('run_test_schema.test_tab
SELECT sum(result::int) FROM run_command_on_shards('run_test_schema.test_table','SELECT pg_table_size(''%s'')');
-- test capital letters on both table and schema names
SET citus.task_executor_type to "real-time";
SET citus.task_executor_type to "adaptive";
-- create schema with weird names
CREATE SCHEMA "CiTuS.TeeN";
CREATE SCHEMA "CiTUS.TEEN2";

View File

@ -217,7 +217,7 @@ SELECT o_orderstatus, sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders
-- now, test the cases where Citus do or do not need to create
-- the master queries
SET client_min_messages TO 'DEBUG2';
SET citus.task_executor_type TO 'real-time';
SET citus.task_executor_type TO 'adaptive';
-- start with the simple lookup query
SELECT *