Merge pull request #1814 from citusdata/rename_multiplan

Rename MultiPlan to DistributedPlan
pull/1824/head
Marco Slot 2017-11-22 09:57:55 +01:00 committed by GitHub
commit 64a5d5da22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 273 additions and 253 deletions

View File

@ -17,7 +17,7 @@
#include "distributed/hash_helpers.h" #include "distributed/hash_helpers.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_planner.h" #include "distributed/distributed_planner.h"
#include "distributed/placement_connection.h" #include "distributed/placement_connection.h"
#include "utils/hsearch.h" #include "utils/hsearch.h"
#include "utils/memutils.h" #include "utils/memutils.h"

View File

@ -16,7 +16,7 @@
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/multi_planner.h" #include "distributed/distributed_planner.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/transaction_management.h" #include "distributed/transaction_management.h"
#include "executor/executor.h" #include "executor/executor.h"
@ -55,10 +55,10 @@ CoordinatorInsertSelectExecScan(CustomScanState *node)
if (!scanState->finishedRemoteScan) if (!scanState->finishedRemoteScan)
{ {
EState *executorState = scanState->customScanState.ss.ps.state; EState *executorState = scanState->customScanState.ss.ps.state;
MultiPlan *multiPlan = scanState->multiPlan; DistributedPlan *distributedPlan = scanState->distributedPlan;
Query *selectQuery = multiPlan->insertSelectSubquery; Query *selectQuery = distributedPlan->insertSelectSubquery;
List *insertTargetList = multiPlan->insertTargetList; List *insertTargetList = distributedPlan->insertTargetList;
Oid targetRelationId = multiPlan->targetRelationId; Oid targetRelationId = distributedPlan->targetRelationId;
ereport(DEBUG1, (errmsg("Collecting INSERT ... SELECT results on coordinator"))); ereport(DEBUG1, (errmsg("Collecting INSERT ... SELECT results on coordinator")));

View File

@ -20,7 +20,7 @@
#include "distributed/multi_copy.h" #include "distributed/multi_copy.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/multi_master_planner.h" #include "distributed/multi_master_planner.h"
#include "distributed/multi_planner.h" #include "distributed/distributed_planner.h"
#include "distributed/multi_router_executor.h" #include "distributed/multi_router_executor.h"
#include "distributed/multi_router_planner.h" #include "distributed/multi_router_planner.h"
#include "distributed/multi_resowner.h" #include "distributed/multi_resowner.h"
@ -115,7 +115,7 @@ RealTimeCreateScan(CustomScan *scan)
scanState->executorType = MULTI_EXECUTOR_REAL_TIME; scanState->executorType = MULTI_EXECUTOR_REAL_TIME;
scanState->customScanState.ss.ps.type = T_CustomScanState; scanState->customScanState.ss.ps.type = T_CustomScanState;
scanState->multiPlan = GetMultiPlan(scan); scanState->distributedPlan = GetDistributedPlan(scan);
scanState->customScanState.methods = &RealTimeCustomExecMethods; scanState->customScanState.methods = &RealTimeCustomExecMethods;
@ -133,7 +133,7 @@ TaskTrackerCreateScan(CustomScan *scan)
scanState->executorType = MULTI_EXECUTOR_TASK_TRACKER; scanState->executorType = MULTI_EXECUTOR_TASK_TRACKER;
scanState->customScanState.ss.ps.type = T_CustomScanState; scanState->customScanState.ss.ps.type = T_CustomScanState;
scanState->multiPlan = GetMultiPlan(scan); scanState->distributedPlan = GetDistributedPlan(scan);
scanState->customScanState.methods = &TaskTrackerCustomExecMethods; scanState->customScanState.methods = &TaskTrackerCustomExecMethods;
@ -148,20 +148,20 @@ Node *
RouterCreateScan(CustomScan *scan) RouterCreateScan(CustomScan *scan)
{ {
CitusScanState *scanState = palloc0(sizeof(CitusScanState)); CitusScanState *scanState = palloc0(sizeof(CitusScanState));
MultiPlan *multiPlan = NULL; DistributedPlan *distributedPlan = NULL;
Job *workerJob = NULL; Job *workerJob = NULL;
List *taskList = NIL; List *taskList = NIL;
bool isModificationQuery = false; bool isModificationQuery = false;
scanState->executorType = MULTI_EXECUTOR_ROUTER; scanState->executorType = MULTI_EXECUTOR_ROUTER;
scanState->customScanState.ss.ps.type = T_CustomScanState; scanState->customScanState.ss.ps.type = T_CustomScanState;
scanState->multiPlan = GetMultiPlan(scan); scanState->distributedPlan = GetDistributedPlan(scan);
multiPlan = scanState->multiPlan; distributedPlan = scanState->distributedPlan;
workerJob = multiPlan->workerJob; workerJob = distributedPlan->workerJob;
taskList = workerJob->taskList; taskList = workerJob->taskList;
isModificationQuery = IsModifyMultiPlan(multiPlan); isModificationQuery = IsModifyDistributedPlan(distributedPlan);
/* check whether query has at most one shard */ /* check whether query has at most one shard */
if (list_length(taskList) <= 1) if (list_length(taskList) <= 1)
@ -180,7 +180,7 @@ RouterCreateScan(CustomScan *scan)
Assert(isModificationQuery); Assert(isModificationQuery);
if (IsMultiRowInsert(workerJob->jobQuery) || if (IsMultiRowInsert(workerJob->jobQuery) ||
(IsUpdateOrDelete(multiPlan) && (IsUpdateOrDelete(distributedPlan) &&
MultiShardConnectionType == SEQUENTIAL_CONNECTION)) MultiShardConnectionType == SEQUENTIAL_CONNECTION))
{ {
/* /*
@ -211,7 +211,7 @@ CoordinatorInsertSelectCreateScan(CustomScan *scan)
scanState->executorType = MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT; scanState->executorType = MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT;
scanState->customScanState.ss.ps.type = T_CustomScanState; scanState->customScanState.ss.ps.type = T_CustomScanState;
scanState->multiPlan = GetMultiPlan(scan); scanState->distributedPlan = GetDistributedPlan(scan);
scanState->customScanState.methods = &CoordinatorInsertSelectCustomExecMethods; scanState->customScanState.methods = &CoordinatorInsertSelectCustomExecMethods;
@ -231,10 +231,10 @@ CoordinatorInsertSelectCreateScan(CustomScan *scan)
Node * Node *
DelayedErrorCreateScan(CustomScan *scan) DelayedErrorCreateScan(CustomScan *scan)
{ {
MultiPlan *multiPlan = GetMultiPlan(scan); DistributedPlan *distributedPlan = GetDistributedPlan(scan);
/* raise the deferred error */ /* raise the deferred error */
RaiseDeferredError(multiPlan->planningError, ERROR); RaiseDeferredError(distributedPlan->planningError, ERROR);
return NULL; return NULL;
} }
@ -264,11 +264,11 @@ RealTimeExecScan(CustomScanState *node)
if (!scanState->finishedRemoteScan) if (!scanState->finishedRemoteScan)
{ {
MultiPlan *multiPlan = scanState->multiPlan; DistributedPlan *distributedPlan = scanState->distributedPlan;
Job *workerJob = multiPlan->workerJob; Job *workerJob = distributedPlan->workerJob;
/* we are taking locks on partitions of partitioned tables */ /* we are taking locks on partitions of partitioned tables */
LockPartitionsInRelationList(multiPlan->relationIdList, AccessShareLock); LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock);
PrepareMasterJobDirectory(workerJob); PrepareMasterJobDirectory(workerJob);
MultiRealTimeExecute(workerJob); MultiRealTimeExecute(workerJob);
@ -464,11 +464,11 @@ TaskTrackerExecScan(CustomScanState *node)
if (!scanState->finishedRemoteScan) if (!scanState->finishedRemoteScan)
{ {
MultiPlan *multiPlan = scanState->multiPlan; DistributedPlan *distributedPlan = scanState->distributedPlan;
Job *workerJob = multiPlan->workerJob; Job *workerJob = distributedPlan->workerJob;
/* we are taking locks on partitions of partitioned tables */ /* we are taking locks on partitions of partitioned tables */
LockPartitionsInRelationList(multiPlan->relationIdList, AccessShareLock); LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock);
PrepareMasterJobDirectory(workerJob); PrepareMasterJobDirectory(workerJob);
MultiTaskTrackerExecute(workerJob); MultiTaskTrackerExecute(workerJob);

View File

@ -36,7 +36,7 @@
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/multi_planner.h" #include "distributed/distributed_planner.h"
#include "distributed/multi_router_executor.h" #include "distributed/multi_router_executor.h"
#include "distributed/multi_router_planner.h" #include "distributed/multi_router_planner.h"
#include "distributed/multi_shard_transaction.h" #include "distributed/multi_shard_transaction.h"
@ -394,8 +394,8 @@ void
CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags) CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags)
{ {
CitusScanState *scanState = (CitusScanState *) node; CitusScanState *scanState = (CitusScanState *) node;
MultiPlan *multiPlan = scanState->multiPlan; DistributedPlan *distributedPlan = scanState->distributedPlan;
Job *workerJob = multiPlan->workerJob; Job *workerJob = distributedPlan->workerJob;
Query *jobQuery = workerJob->jobQuery; Query *jobQuery = workerJob->jobQuery;
List *taskList = workerJob->taskList; List *taskList = workerJob->taskList;
bool deferredPruning = workerJob->deferredPruning; bool deferredPruning = workerJob->deferredPruning;
@ -440,7 +440,7 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags)
* We are taking locks on partitions of partitioned tables. These locks are * We are taking locks on partitions of partitioned tables. These locks are
* necessary for locking tables that appear in the SELECT part of the query. * necessary for locking tables that appear in the SELECT part of the query.
*/ */
LockPartitionsInRelationList(multiPlan->relationIdList, AccessShareLock); LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock);
/* modify tasks are always assigned using first-replica policy */ /* modify tasks are always assigned using first-replica policy */
workerJob->taskList = FirstReplicaAssignTaskList(taskList); workerJob->taskList = FirstReplicaAssignTaskList(taskList);
@ -459,9 +459,9 @@ RouterSequentialModifyExecScan(CustomScanState *node)
if (!scanState->finishedRemoteScan) if (!scanState->finishedRemoteScan)
{ {
MultiPlan *multiPlan = scanState->multiPlan; DistributedPlan *distributedPlan = scanState->distributedPlan;
bool hasReturning = multiPlan->hasReturning; bool hasReturning = distributedPlan->hasReturning;
Job *workerJob = multiPlan->workerJob; Job *workerJob = distributedPlan->workerJob;
List *taskList = workerJob->taskList; List *taskList = workerJob->taskList;
ListCell *taskCell = NULL; ListCell *taskCell = NULL;
bool multipleTasks = list_length(taskList) > 1; bool multipleTasks = list_length(taskList) > 1;
@ -506,10 +506,10 @@ RouterMultiModifyExecScan(CustomScanState *node)
if (!scanState->finishedRemoteScan) if (!scanState->finishedRemoteScan)
{ {
MultiPlan *multiPlan = scanState->multiPlan; DistributedPlan *distributedPlan = scanState->distributedPlan;
Job *workerJob = multiPlan->workerJob; Job *workerJob = distributedPlan->workerJob;
List *taskList = workerJob->taskList; List *taskList = workerJob->taskList;
bool hasReturning = multiPlan->hasReturning; bool hasReturning = distributedPlan->hasReturning;
bool isModificationQuery = true; bool isModificationQuery = true;
ExecuteMultipleTasks(scanState, taskList, isModificationQuery, hasReturning); ExecuteMultipleTasks(scanState, taskList, isModificationQuery, hasReturning);
@ -536,12 +536,12 @@ RouterSelectExecScan(CustomScanState *node)
if (!scanState->finishedRemoteScan) if (!scanState->finishedRemoteScan)
{ {
MultiPlan *multiPlan = scanState->multiPlan; DistributedPlan *distributedPlan = scanState->distributedPlan;
Job *workerJob = multiPlan->workerJob; Job *workerJob = distributedPlan->workerJob;
List *taskList = workerJob->taskList; List *taskList = workerJob->taskList;
/* we are taking locks on partitions of partitioned tables */ /* we are taking locks on partitions of partitioned tables */
LockPartitionsInRelationList(multiPlan->relationIdList, AccessShareLock); LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock);
if (list_length(taskList) > 0) if (list_length(taskList) > 0)
{ {
@ -693,7 +693,7 @@ static void
ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool multipleTasks, ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool multipleTasks,
bool expectResults) bool expectResults)
{ {
CmdType operation = scanState->multiPlan->operation; CmdType operation = scanState->distributedPlan->operation;
EState *executorState = scanState->customScanState.ss.ps.state; EState *executorState = scanState->customScanState.ss.ps.state;
ParamListInfo paramListInfo = executorState->es_param_list_info; ParamListInfo paramListInfo = executorState->es_param_list_info;
List *taskPlacementList = task->taskPlacementList; List *taskPlacementList = task->taskPlacementList;

View File

@ -32,22 +32,22 @@ bool BinaryMasterCopyFormat = false; /* copy data from workers in binary format
/* /*
* JobExecutorType selects the executor type for the given multiPlan using the task * JobExecutorType selects the executor type for the given distributedPlan using the task
* executor type config value. The function then checks if the given multiPlan needs * executor type config value. The function then checks if the given distributedPlan needs
* more resources than those provided to it by other config values, and issues * more resources than those provided to it by other config values, and issues
* warnings accordingly. If the selected executor type cannot execute the given * warnings accordingly. If the selected executor type cannot execute the given
* multiPlan, the function errors out. * distributedPlan, the function errors out.
*/ */
MultiExecutorType MultiExecutorType
JobExecutorType(MultiPlan *multiPlan) JobExecutorType(DistributedPlan *distributedPlan)
{ {
Job *job = multiPlan->workerJob; Job *job = distributedPlan->workerJob;
List *workerNodeList = NIL; List *workerNodeList = NIL;
int workerNodeCount = 0; int workerNodeCount = 0;
int taskCount = 0; int taskCount = 0;
double tasksPerNode = 0.; double tasksPerNode = 0.;
MultiExecutorType executorType = TaskExecutorType; MultiExecutorType executorType = TaskExecutorType;
bool routerExecutablePlan = multiPlan->routerExecutable; bool routerExecutablePlan = distributedPlan->routerExecutable;
/* check if can switch to router executor */ /* check if can switch to router executor */
if (routerExecutablePlan) if (routerExecutablePlan)
@ -56,12 +56,12 @@ JobExecutorType(MultiPlan *multiPlan)
return MULTI_EXECUTOR_ROUTER; return MULTI_EXECUTOR_ROUTER;
} }
if (multiPlan->insertSelectSubquery != NULL) if (distributedPlan->insertSelectSubquery != NULL)
{ {
return MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT; return MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT;
} }
Assert(multiPlan->operation == CMD_SELECT); Assert(distributedPlan->operation == CMD_SELECT);
workerNodeList = ActiveReadableNodeList(); workerNodeList = ActiveReadableNodeList();
workerNodeCount = list_length(workerNodeList); workerNodeCount = list_length(workerNodeList);

View File

@ -45,7 +45,7 @@
#include "distributed/multi_copy.h" #include "distributed/multi_copy.h"
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_planner.h" #include "distributed/distributed_planner.h"
#include "distributed/multi_router_executor.h" #include "distributed/multi_router_executor.h"
#include "distributed/multi_router_planner.h" #include "distributed/multi_router_planner.h"
#include "distributed/multi_shard_transaction.h" #include "distributed/multi_shard_transaction.h"

View File

@ -1,6 +1,6 @@
/*------------------------------------------------------------------------- /*-------------------------------------------------------------------------
* *
* multi_planner.c * distributed_planner.c
* General Citus planner code. * General Citus planner code.
* *
* Copyright (c) 2012-2016, Citus Data, Inc. * Copyright (c) 2012-2016, Citus Data, Inc.
@ -19,7 +19,7 @@
#include "distributed/insert_select_planner.h" #include "distributed/insert_select_planner.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/multi_planner.h" #include "distributed/distributed_planner.h"
#include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_planner.h"
#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_partitioning_utils.h"
@ -74,8 +74,10 @@ static PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, Query *origin
static void AdjustParseTree(Query *parse, bool assignRTEIdentities, static void AdjustParseTree(Query *parse, bool assignRTEIdentities,
bool setPartitionedTablesInherited); bool setPartitionedTablesInherited);
static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier); static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier);
static PlannedStmt * FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan); static PlannedStmt * FinalizePlan(PlannedStmt *localPlan,
static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan, DistributedPlan *distributedPlan);
static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan,
DistributedPlan *distributedPlan,
CustomScan *customScan); CustomScan *customScan);
static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan); static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan);
static void CheckNodeIsDumpable(Node *node); static void CheckNodeIsDumpable(Node *node);
@ -89,7 +91,7 @@ static bool HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boun
/* Distributed planner hook */ /* Distributed planner hook */
PlannedStmt * PlannedStmt *
multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
{ {
PlannedStmt *result = NULL; PlannedStmt *result = NULL;
bool needsDistributedPlanning = NeedsDistributedPlanning(parse); bool needsDistributedPlanning = NeedsDistributedPlanning(parse);
@ -289,9 +291,9 @@ IsModifyCommand(Query *query)
* multi shard update or delete query. * multi shard update or delete query.
*/ */
bool bool
IsMultiShardModifyPlan(MultiPlan *multiPlan) IsMultiShardModifyPlan(DistributedPlan *distributedPlan)
{ {
if (IsUpdateOrDelete(multiPlan) && IsMultiTaskPlan(multiPlan)) if (IsUpdateOrDelete(distributedPlan) && IsMultiTaskPlan(distributedPlan))
{ {
return true; return true;
} }
@ -304,9 +306,9 @@ IsMultiShardModifyPlan(MultiPlan *multiPlan)
* IsMultiTaskPlan returns true if job contains multiple tasks. * IsMultiTaskPlan returns true if job contains multiple tasks.
*/ */
bool bool
IsMultiTaskPlan(MultiPlan *multiPlan) IsMultiTaskPlan(DistributedPlan *distributedPlan)
{ {
Job *workerJob = multiPlan->workerJob; Job *workerJob = distributedPlan->workerJob;
if (workerJob != NULL && list_length(workerJob->taskList) > 1) if (workerJob != NULL && list_length(workerJob->taskList) > 1)
{ {
@ -321,9 +323,9 @@ IsMultiTaskPlan(MultiPlan *multiPlan)
* IsUpdateOrDelete returns true if the query performs update or delete. * IsUpdateOrDelete returns true if the query performs update or delete.
*/ */
bool bool
IsUpdateOrDelete(MultiPlan *multiPlan) IsUpdateOrDelete(DistributedPlan *distributedPlan)
{ {
CmdType commandType = multiPlan->operation; CmdType commandType = distributedPlan->operation;
if (commandType == CMD_UPDATE || commandType == CMD_DELETE) if (commandType == CMD_UPDATE || commandType == CMD_DELETE)
{ {
@ -335,21 +337,21 @@ IsUpdateOrDelete(MultiPlan *multiPlan)
/* /*
* IsModifyMultiPlan returns true if the multi plan performs modifications, * IsModifyDistributedPlan returns true if the multi plan performs modifications,
* false otherwise. * false otherwise.
*/ */
bool bool
IsModifyMultiPlan(MultiPlan *multiPlan) IsModifyDistributedPlan(DistributedPlan *distributedPlan)
{ {
bool isModifyMultiPlan = false; bool isModifyDistributedPlan = false;
CmdType operation = multiPlan->operation; CmdType operation = distributedPlan->operation;
if (operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE) if (operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE)
{ {
isModifyMultiPlan = true; isModifyDistributedPlan = true;
} }
return isModifyMultiPlan; return isModifyDistributedPlan;
} }
@ -362,7 +364,7 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query
ParamListInfo boundParams, ParamListInfo boundParams,
PlannerRestrictionContext *plannerRestrictionContext) PlannerRestrictionContext *plannerRestrictionContext)
{ {
MultiPlan *distributedPlan = NULL; DistributedPlan *distributedPlan = NULL;
PlannedStmt *resultPlan = NULL; PlannedStmt *resultPlan = NULL;
bool hasUnresolvedParams = false; bool hasUnresolvedParams = false;
@ -435,7 +437,7 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query
CheckNodeIsDumpable((Node *) logicalPlan); CheckNodeIsDumpable((Node *) logicalPlan);
/* Create the physical plan */ /* Create the physical plan */
distributedPlan = MultiPhysicalPlanCreate(logicalPlan, distributedPlan = CreatePhysicalDistributedPlan(logicalPlan,
plannerRestrictionContext); plannerRestrictionContext);
/* distributed plan currently should always succeed or error out */ /* distributed plan currently should always succeed or error out */
@ -456,7 +458,7 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query
{ {
/* currently always should have a more specific error otherwise */ /* currently always should have a more specific error otherwise */
Assert(hasUnresolvedParams); Assert(hasUnresolvedParams);
distributedPlan = CitusMakeNode(MultiPlan); distributedPlan = CitusMakeNode(DistributedPlan);
distributedPlan->planningError = distributedPlan->planningError =
DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"could not create distributed plan", "could not create distributed plan",
@ -505,31 +507,31 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query
/* /*
* GetMultiPlan returns the associated MultiPlan for a CustomScan. * GetDistributedPlan returns the associated DistributedPlan for a CustomScan.
*/ */
MultiPlan * DistributedPlan *
GetMultiPlan(CustomScan *customScan) GetDistributedPlan(CustomScan *customScan)
{ {
Node *node = NULL; Node *node = NULL;
MultiPlan *multiPlan = NULL; DistributedPlan *distributedPlan = NULL;
Assert(list_length(customScan->custom_private) == 1); Assert(list_length(customScan->custom_private) == 1);
node = (Node *) linitial(customScan->custom_private); node = (Node *) linitial(customScan->custom_private);
Assert(CitusIsA(node, MultiPlan)); Assert(CitusIsA(node, DistributedPlan));
node = CheckNodeCopyAndSerialization(node); node = CheckNodeCopyAndSerialization(node);
/* /*
* When using prepared statements the same plan gets reused across * When using prepared statements the same plan gets reused across
* multiple statements and transactions. We make several modifications * multiple statements and transactions. We make several modifications
* to the MultiPlan during execution such as assigning task placements * to the DistributedPlan during execution such as assigning task placements
* and evaluating functions and parameters. These changes should not * and evaluating functions and parameters. These changes should not
* persist, so we always work on a copy. * persist, so we always work on a copy.
*/ */
multiPlan = (MultiPlan *) copyObject(node); distributedPlan = (DistributedPlan *) copyObject(node);
return multiPlan; return distributedPlan;
} }
@ -538,16 +540,16 @@ GetMultiPlan(CustomScan *customScan)
* which can be run by the PostgreSQL executor. * which can be run by the PostgreSQL executor.
*/ */
static PlannedStmt * static PlannedStmt *
FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan) FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan)
{ {
PlannedStmt *finalPlan = NULL; PlannedStmt *finalPlan = NULL;
CustomScan *customScan = makeNode(CustomScan); CustomScan *customScan = makeNode(CustomScan);
Node *multiPlanData = NULL; Node *distributedPlanData = NULL;
MultiExecutorType executorType = MULTI_EXECUTOR_INVALID_FIRST; MultiExecutorType executorType = MULTI_EXECUTOR_INVALID_FIRST;
if (!multiPlan->planningError) if (!distributedPlan->planningError)
{ {
executorType = JobExecutorType(multiPlan); executorType = JobExecutorType(distributedPlan);
} }
switch (executorType) switch (executorType)
@ -583,7 +585,7 @@ FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan)
} }
} }
if (IsMultiTaskPlan(multiPlan)) if (IsMultiTaskPlan(distributedPlan))
{ {
/* if it is not a single task executable plan, inform user according to the log level */ /* if it is not a single task executable plan, inform user according to the log level */
if (MultiTaskQueryLogLevel != MULTI_TASK_QUERY_INFO_OFF) if (MultiTaskQueryLogLevel != MULTI_TASK_QUERY_INFO_OFF)
@ -597,16 +599,16 @@ FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan)
} }
} }
multiPlan->relationIdList = localPlan->relationOids; distributedPlan->relationIdList = localPlan->relationOids;
multiPlanData = (Node *) multiPlan; distributedPlanData = (Node *) distributedPlan;
customScan->custom_private = list_make1(multiPlanData); customScan->custom_private = list_make1(distributedPlanData);
customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN; customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN;
if (multiPlan->masterQuery) if (distributedPlan->masterQuery)
{ {
finalPlan = FinalizeNonRouterPlan(localPlan, multiPlan, customScan); finalPlan = FinalizeNonRouterPlan(localPlan, distributedPlan, customScan);
} }
else else
{ {
@ -623,12 +625,12 @@ FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan)
* and task-tracker executors. * and task-tracker executors.
*/ */
static PlannedStmt * static PlannedStmt *
FinalizeNonRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan, FinalizeNonRouterPlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan,
CustomScan *customScan) CustomScan *customScan)
{ {
PlannedStmt *finalPlan = NULL; PlannedStmt *finalPlan = NULL;
finalPlan = MasterNodeSelectPlan(multiPlan, customScan); finalPlan = MasterNodeSelectPlan(distributedPlan, customScan);
finalPlan->queryId = localPlan->queryId; finalPlan->queryId = localPlan->queryId;
finalPlan->utilityStmt = localPlan->utilityStmt; finalPlan->utilityStmt = localPlan->utilityStmt;

View File

@ -38,7 +38,7 @@
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
static MultiPlan * CreateDistributedInsertSelectPlan(Query *originalQuery, static DistributedPlan * CreateDistributedInsertSelectPlan(Query *originalQuery,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext); plannerRestrictionContext);
static bool SafeToPushDownSubquery(PlannerRestrictionContext *plannerRestrictionContext, static bool SafeToPushDownSubquery(PlannerRestrictionContext *plannerRestrictionContext,
@ -61,7 +61,7 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query,
subqueryRte, subqueryRte,
Oid * Oid *
selectPartitionColumnTableId); selectPartitionColumnTableId);
static MultiPlan * CreateCoordinatorInsertSelectPlan(Query *parse); static DistributedPlan * CreateCoordinatorInsertSelectPlan(Query *parse);
static DeferredErrorMessage * CoordinatorInsertSelectSupported(Query *insertSelectQuery); static DeferredErrorMessage * CoordinatorInsertSelectSupported(Query *insertSelectQuery);
static Query * WrapSubquery(Query *subquery); static Query * WrapSubquery(Query *subquery);
static bool CheckInsertSelectQuery(Query *query); static bool CheckInsertSelectQuery(Query *query);
@ -172,11 +172,11 @@ CheckInsertSelectQuery(Query *query)
* command to the workers and if that is not possible it creates a * command to the workers and if that is not possible it creates a
* plan for evaluating the SELECT on the coordinator. * plan for evaluating the SELECT on the coordinator.
*/ */
MultiPlan * DistributedPlan *
CreateInsertSelectPlan(Query *originalQuery, CreateInsertSelectPlan(Query *originalQuery,
PlannerRestrictionContext *plannerRestrictionContext) PlannerRestrictionContext *plannerRestrictionContext)
{ {
MultiPlan *distributedPlan = NULL; DistributedPlan *distributedPlan = NULL;
distributedPlan = CreateDistributedInsertSelectPlan(originalQuery, distributedPlan = CreateDistributedInsertSelectPlan(originalQuery,
plannerRestrictionContext); plannerRestrictionContext);
@ -194,12 +194,12 @@ CreateInsertSelectPlan(Query *originalQuery,
/* /*
* CreateDistributedInsertSelectPlan Creates a MultiPlan for distributed * CreateDistributedInsertSelectPlan Creates a DistributedPlan for distributed
* INSERT ... SELECT queries which could consists of multiple tasks. * INSERT ... SELECT queries which could consists of multiple tasks.
* *
* The function never returns NULL, it errors out if cannot create the MultiPlan. * The function never returns NULL, it errors out if cannot create the DistributedPlan.
*/ */
static MultiPlan * static DistributedPlan *
CreateDistributedInsertSelectPlan(Query *originalQuery, CreateDistributedInsertSelectPlan(Query *originalQuery,
PlannerRestrictionContext *plannerRestrictionContext) PlannerRestrictionContext *plannerRestrictionContext)
{ {
@ -208,7 +208,7 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
uint32 taskIdIndex = 1; /* 0 is reserved for invalid taskId */ uint32 taskIdIndex = 1; /* 0 is reserved for invalid taskId */
Job *workerJob = NULL; Job *workerJob = NULL;
uint64 jobId = INVALID_JOB_ID; uint64 jobId = INVALID_JOB_ID;
MultiPlan *multiPlan = CitusMakeNode(MultiPlan); DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(originalQuery); RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(originalQuery);
RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery); RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery);
Oid targetRelationId = insertRte->relid; Oid targetRelationId = insertRte->relid;
@ -219,18 +219,19 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
bool allReferenceTables = relationRestrictionContext->allReferenceTables; bool allReferenceTables = relationRestrictionContext->allReferenceTables;
bool safeToPushDownSubquery = false; bool safeToPushDownSubquery = false;
multiPlan->operation = originalQuery->commandType; distributedPlan->operation = originalQuery->commandType;
/* /*
* Error semantics for INSERT ... SELECT queries are different than regular * Error semantics for INSERT ... SELECT queries are different than regular
* modify queries. Thus, handle separately. * modify queries. Thus, handle separately.
*/ */
multiPlan->planningError = DistributedInsertSelectSupported(originalQuery, insertRte, distributedPlan->planningError = DistributedInsertSelectSupported(originalQuery,
insertRte,
subqueryRte, subqueryRte,
allReferenceTables); allReferenceTables);
if (multiPlan->planningError) if (distributedPlan->planningError)
{ {
return multiPlan; return distributedPlan;
} }
safeToPushDownSubquery = SafeToPushDownSubquery(plannerRestrictionContext, safeToPushDownSubquery = SafeToPushDownSubquery(plannerRestrictionContext,
@ -238,7 +239,7 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
/* /*
* Plan select query for each shard in the target table. Do so by replacing the * Plan select query for each shard in the target table. Do so by replacing the
* partitioning qual parameter added in multi_planner() using the current shard's * partitioning qual parameter added in distributed_planner() using the current shard's
* actual boundary values. Also, add the current shard's boundary values to the * actual boundary values. Also, add the current shard's boundary values to the
* top level subquery to ensure that even if the partitioning qual is not distributed * top level subquery to ensure that even if the partitioning qual is not distributed
* to all the tables, we never run the queries on the shards that don't match with * to all the tables, we never run the queries on the shards that don't match with
@ -277,17 +278,17 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
workerJob->requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery); workerJob->requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery);
/* and finally the multi plan */ /* and finally the multi plan */
multiPlan->workerJob = workerJob; distributedPlan->workerJob = workerJob;
multiPlan->masterQuery = NULL; distributedPlan->masterQuery = NULL;
multiPlan->routerExecutable = true; distributedPlan->routerExecutable = true;
multiPlan->hasReturning = false; distributedPlan->hasReturning = false;
if (list_length(originalQuery->returningList) > 0) if (list_length(originalQuery->returningList) > 0)
{ {
multiPlan->hasReturning = true; distributedPlan->hasReturning = true;
} }
return multiPlan; return distributedPlan;
} }
@ -423,7 +424,7 @@ SafeToPushDownSubquery(PlannerRestrictionContext *plannerRestrictionContext,
/* /*
* RouterModifyTaskForShardInterval creates a modify task by * RouterModifyTaskForShardInterval creates a modify task by
* replacing the partitioning qual parameter added in multi_planner() * replacing the partitioning qual parameter added in distributed_planner()
* with the shardInterval's boundary value. Then perform the normal * with the shardInterval's boundary value. Then perform the normal
* shard pruning on the subquery. Finally, checks if the target shardInterval * shard pruning on the subquery. Finally, checks if the target shardInterval
* has exactly same placements with the select task's available anchor * has exactly same placements with the select task's available anchor
@ -1134,7 +1135,7 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte,
* CreatteCoordinatorInsertSelectPlan creates a query plan for a SELECT into a * CreatteCoordinatorInsertSelectPlan creates a query plan for a SELECT into a
* distributed table. The query plan can also be executed on a worker in MX. * distributed table. The query plan can also be executed on a worker in MX.
*/ */
static MultiPlan * static DistributedPlan *
CreateCoordinatorInsertSelectPlan(Query *parse) CreateCoordinatorInsertSelectPlan(Query *parse)
{ {
Query *insertSelectQuery = copyObject(parse); Query *insertSelectQuery = copyObject(parse);
@ -1144,15 +1145,15 @@ CreateCoordinatorInsertSelectPlan(Query *parse)
RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(insertSelectQuery); RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(insertSelectQuery);
Oid targetRelationId = insertRte->relid; Oid targetRelationId = insertRte->relid;
MultiPlan *multiPlan = CitusMakeNode(MultiPlan); DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
multiPlan->operation = CMD_INSERT; distributedPlan->operation = CMD_INSERT;
multiPlan->planningError = distributedPlan->planningError =
CoordinatorInsertSelectSupported(insertSelectQuery); CoordinatorInsertSelectSupported(insertSelectQuery);
if (multiPlan->planningError != NULL) if (distributedPlan->planningError != NULL)
{ {
return multiPlan; return distributedPlan;
} }
selectQuery = selectRte->subquery; selectQuery = selectRte->subquery;
@ -1183,11 +1184,11 @@ CreateCoordinatorInsertSelectPlan(Query *parse)
ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte); ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
multiPlan->insertSelectSubquery = selectQuery; distributedPlan->insertSelectSubquery = selectQuery;
multiPlan->insertTargetList = insertSelectQuery->targetList; distributedPlan->insertTargetList = insertSelectQuery->targetList;
multiPlan->targetRelationId = targetRelationId; distributedPlan->targetRelationId = targetRelationId;
return multiPlan; return distributedPlan;
} }

View File

@ -30,7 +30,7 @@
#include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_planner.h"
#include "distributed/multi_master_planner.h" #include "distributed/multi_master_planner.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/multi_planner.h" #include "distributed/distributed_planner.h"
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/placement_connection.h" #include "distributed/placement_connection.h"
@ -112,7 +112,7 @@ void
CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es) CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es)
{ {
CitusScanState *scanState = (CitusScanState *) node; CitusScanState *scanState = (CitusScanState *) node;
MultiPlan *multiPlan = scanState->multiPlan; DistributedPlan *distributedPlan = scanState->distributedPlan;
if (!ExplainDistributedQueries) if (!ExplainDistributedQueries)
{ {
@ -124,7 +124,7 @@ CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es
ExplainOpenGroup("Distributed Query", "Distributed Query", true, es); ExplainOpenGroup("Distributed Query", "Distributed Query", true, es);
ExplainJob(multiPlan->workerJob, es); ExplainJob(distributedPlan->workerJob, es);
ExplainCloseGroup("Distributed Query", "Distributed Query", true, es); ExplainCloseGroup("Distributed Query", "Distributed Query", true, es);
} }
@ -140,8 +140,8 @@ CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors,
struct ExplainState *es) struct ExplainState *es)
{ {
CitusScanState *scanState = (CitusScanState *) node; CitusScanState *scanState = (CitusScanState *) node;
MultiPlan *multiPlan = scanState->multiPlan; DistributedPlan *distributedPlan = scanState->distributedPlan;
Query *query = multiPlan->insertSelectSubquery; Query *query = distributedPlan->insertSelectSubquery;
IntoClause *into = NULL; IntoClause *into = NULL;
ParamListInfo params = NULL; ParamListInfo params = NULL;
char *queryString = NULL; char *queryString = NULL;

View File

@ -103,7 +103,7 @@ static bool AllTargetExpressionsAreColumnReferences(List *targetEntryList);
static bool RangeTableListContainsOnlyReferenceTables(List *rangeTableList); static bool RangeTableListContainsOnlyReferenceTables(List *rangeTableList);
static FieldSelect * CompositeFieldRecursive(Expr *expression, Query *query); static FieldSelect * CompositeFieldRecursive(Expr *expression, Query *query);
static bool FullCompositeFieldList(List *compositeFieldList); static bool FullCompositeFieldList(List *compositeFieldList);
static MultiNode * MultiPlanTree(Query *queryTree); static MultiNode * MultiNodeTree(Query *queryTree);
static void ErrorIfQueryNotSupported(Query *queryTree); static void ErrorIfQueryNotSupported(Query *queryTree);
static DeferredErrorMessage * DeferredErrorIfUnsupportedRecurringTuplesJoin( static DeferredErrorMessage * DeferredErrorIfUnsupportedRecurringTuplesJoin(
PlannerRestrictionContext *plannerRestrictionContext); PlannerRestrictionContext *plannerRestrictionContext);
@ -162,13 +162,13 @@ static MultiNode * ApplyCartesianProduct(MultiNode *leftNode, MultiNode *rightNo
* functions will be removed with upcoming subqery changes. * functions will be removed with upcoming subqery changes.
*/ */
static Node * ResolveExternalParams(Node *inputNode, ParamListInfo boundParams); static Node * ResolveExternalParams(Node *inputNode, ParamListInfo boundParams);
static MultiNode * MultiSubqueryPlanTree(Query *originalQuery, static MultiNode * SubqueryMultiNodeTree(Query *originalQuery,
Query *queryTree, Query *queryTree,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext); plannerRestrictionContext);
static List * SublinkList(Query *originalQuery); static List * SublinkList(Query *originalQuery);
static bool ExtractSublinkWalker(Node *node, List **sublinkList); static bool ExtractSublinkWalker(Node *node, List **sublinkList);
static MultiNode * SubqueryPushdownMultiPlanTree(Query *queryTree); static MultiNode * SubqueryPushdownMultiNodeTree(Query *queryTree);
static List * CreateSubqueryTargetEntryList(List *columnList); static List * CreateSubqueryTargetEntryList(List *columnList);
static void UpdateVarMappingsForExtendedOpNode(List *columnList, static void UpdateVarMappingsForExtendedOpNode(List *columnList,
@ -203,24 +203,24 @@ MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree,
/* /*
* We check the existence of subqueries in FROM clause on the modified query * We check the existence of subqueries in FROM clause on the modified query
* given that if postgres already flattened the subqueries, MultiPlanTree() * given that if postgres already flattened the subqueries, MultiNodeTree()
* can plan corresponding distributed plan. * can plan corresponding distributed plan.
* *
* We also check the existence of subqueries in WHERE clause. Note that * We also check the existence of subqueries in WHERE clause. Note that
* this check needs to be done on the original query given that * this check needs to be done on the original query given that
* standard_planner() may replace the sublinks with anti/semi joins and * standard_planner() may replace the sublinks with anti/semi joins and
* MultiPlanTree() cannot plan such queries. * MultiNodeTree() cannot plan such queries.
*/ */
if (SubqueryEntryList(queryTree) != NIL || SublinkList(originalQuery) != NIL) if (SubqueryEntryList(queryTree) != NIL || SublinkList(originalQuery) != NIL)
{ {
originalQuery = (Query *) ResolveExternalParams((Node *) originalQuery, originalQuery = (Query *) ResolveExternalParams((Node *) originalQuery,
boundParams); boundParams);
multiQueryNode = MultiSubqueryPlanTree(originalQuery, queryTree, multiQueryNode = SubqueryMultiNodeTree(originalQuery, queryTree,
plannerRestrictionContext); plannerRestrictionContext);
} }
else else
{ {
multiQueryNode = MultiPlanTree(queryTree); multiQueryNode = MultiNodeTree(queryTree);
} }
/* add a root node to serve as the permanent handle to the tree */ /* add a root node to serve as the permanent handle to the tree */
@ -376,7 +376,7 @@ ExtractSublinkWalker(Node *node, List **sublinkList)
/* /*
* MultiSubqueryPlanTree gets the query objects and returns logical plan * SubqueryMultiNodeTree gets the query objects and returns logical plan
* for subqueries. * for subqueries.
* *
* We currently have two different code paths for creating logic plan for subqueries: * We currently have two different code paths for creating logic plan for subqueries:
@ -395,7 +395,7 @@ ExtractSublinkWalker(Node *node, List **sublinkList)
* - If found errors, throw it * - If found errors, throw it
*/ */
static MultiNode * static MultiNode *
MultiSubqueryPlanTree(Query *originalQuery, Query *queryTree, SubqueryMultiNodeTree(Query *originalQuery, Query *queryTree,
PlannerRestrictionContext *plannerRestrictionContext) PlannerRestrictionContext *plannerRestrictionContext)
{ {
MultiNode *multiQueryNode = NULL; MultiNode *multiQueryNode = NULL;
@ -416,7 +416,7 @@ MultiSubqueryPlanTree(Query *originalQuery, Query *queryTree,
plannerRestrictionContext); plannerRestrictionContext);
if (!subqueryPushdownError) if (!subqueryPushdownError)
{ {
multiQueryNode = SubqueryPushdownMultiPlanTree(originalQuery); multiQueryNode = SubqueryPushdownMultiNodeTree(originalQuery);
} }
else if (subqueryPushdownError) else if (subqueryPushdownError)
{ {
@ -450,7 +450,7 @@ MultiSubqueryPlanTree(Query *originalQuery, Query *queryTree,
} }
/* all checks has passed, safe to create the multi plan */ /* all checks has passed, safe to create the multi plan */
multiQueryNode = MultiPlanTree(queryTree); multiQueryNode = MultiNodeTree(queryTree);
} }
Assert(multiQueryNode != NULL); Assert(multiQueryNode != NULL);
@ -1591,7 +1591,7 @@ SubqueryEntryList(Query *queryTree)
/* /*
* MultiPlanTree takes in a parsed query tree and uses that tree to construct a * MultiNodeTree takes in a parsed query tree and uses that tree to construct a
* logical plan. This plan is based on multi-relational algebra. This function * logical plan. This plan is based on multi-relational algebra. This function
* creates the logical plan in several steps. * creates the logical plan in several steps.
* *
@ -1609,7 +1609,7 @@ SubqueryEntryList(Query *queryTree)
* group, and limit nodes if they appear in the original query tree. * group, and limit nodes if they appear in the original query tree.
*/ */
static MultiNode * static MultiNode *
MultiPlanTree(Query *queryTree) MultiNodeTree(Query *queryTree)
{ {
List *rangeTableList = queryTree->rtable; List *rangeTableList = queryTree->rtable;
List *targetEntryList = queryTree->targetList; List *targetEntryList = queryTree->targetList;
@ -1686,7 +1686,7 @@ MultiPlanTree(Query *queryTree)
} }
/* recursively create child nested multitree */ /* recursively create child nested multitree */
subqueryExtendedNode = MultiPlanTree(subqueryTree); subqueryExtendedNode = MultiNodeTree(subqueryTree);
SetChild((MultiUnaryNode *) subqueryCollectNode, (MultiNode *) subqueryNode); SetChild((MultiUnaryNode *) subqueryCollectNode, (MultiNode *) subqueryNode);
SetChild((MultiUnaryNode *) subqueryNode, subqueryExtendedNode); SetChild((MultiUnaryNode *) subqueryNode, subqueryExtendedNode);
@ -3708,7 +3708,7 @@ ApplyCartesianProduct(MultiNode *leftNode, MultiNode *rightNode,
/* /*
* SubqueryPushdownMultiTree creates logical plan for subquery pushdown logic. * SubqueryPushdownMultiNodeTree creates logical plan for subquery pushdown logic.
* Note that this logic will be changed in next iterations, so we decoupled it * Note that this logic will be changed in next iterations, so we decoupled it
* from other parts of code although it causes some code duplication. * from other parts of code although it causes some code duplication.
* *
@ -3719,7 +3719,7 @@ ApplyCartesianProduct(MultiNode *leftNode, MultiNode *rightNode,
* down to workers without invoking join order planner. * down to workers without invoking join order planner.
*/ */
static MultiNode * static MultiNode *
SubqueryPushdownMultiPlanTree(Query *queryTree) SubqueryPushdownMultiNodeTree(Query *queryTree)
{ {
List *targetEntryList = queryTree->targetList; List *targetEntryList = queryTree->targetList;
List *qualifierList = NIL; List *qualifierList = NIL;

View File

@ -15,7 +15,7 @@
#include "distributed/multi_master_planner.h" #include "distributed/multi_master_planner.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/multi_planner.h" #include "distributed/distributed_planner.h"
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "nodes/makefuncs.h" #include "nodes/makefuncs.h"
@ -47,12 +47,12 @@ static Plan * BuildDistinctPlan(Query *masterQuery, Plan *subPlan);
* filled into the tuple store inside provided custom scan. * filled into the tuple store inside provided custom scan.
*/ */
PlannedStmt * PlannedStmt *
MasterNodeSelectPlan(MultiPlan *multiPlan, CustomScan *remoteScan) MasterNodeSelectPlan(DistributedPlan *distributedPlan, CustomScan *remoteScan)
{ {
Query *masterQuery = multiPlan->masterQuery; Query *masterQuery = distributedPlan->masterQuery;
PlannedStmt *masterSelectPlan = NULL; PlannedStmt *masterSelectPlan = NULL;
Job *workerJob = multiPlan->workerJob; Job *workerJob = distributedPlan->workerJob;
List *workerTargetList = workerJob->jobQuery->targetList; List *workerTargetList = workerJob->jobQuery->targetList;
List *masterTargetList = MasterTargetList(workerTargetList); List *masterTargetList = MasterTargetList(workerTargetList);

View File

@ -121,7 +121,7 @@ static ArrayType * SplitPointObject(ShardInterval **shardIntervalArray,
uint32 shardIntervalCount); uint32 shardIntervalCount);
/* Local functions forward declarations for task list creation and helper functions */ /* Local functions forward declarations for task list creation and helper functions */
static bool MultiPlanRouterExecutable(MultiPlan *multiPlan); static bool DistributedPlanRouterExecutable(DistributedPlan *distributedPlan);
static Job * BuildJobTreeTaskList(Job *jobTree, static Job * BuildJobTreeTaskList(Job *jobTree,
PlannerRestrictionContext *plannerRestrictionContext); PlannerRestrictionContext *plannerRestrictionContext);
static List * SubquerySqlTaskList(Job *job, static List * SubquerySqlTaskList(Job *job,
@ -198,15 +198,15 @@ static uint32 FinalTargetEntryCount(List *targetEntryList);
/* /*
* MultiPhysicalPlanCreate is the entry point for physical plan generation. The * CreatePhysicalDistributedPlan is the entry point for physical plan generation. The
* function builds the physical plan; this plan includes the list of tasks to be * function builds the physical plan; this plan includes the list of tasks to be
* executed on worker nodes, and the final query to run on the master node. * executed on worker nodes, and the final query to run on the master node.
*/ */
MultiPlan * DistributedPlan *
MultiPhysicalPlanCreate(MultiTreeRoot *multiTree, CreatePhysicalDistributedPlan(MultiTreeRoot *multiTree,
PlannerRestrictionContext *plannerRestrictionContext) PlannerRestrictionContext *plannerRestrictionContext)
{ {
MultiPlan *multiPlan = NULL; DistributedPlan *distributedPlan = NULL;
Job *workerJob = NULL; Job *workerJob = NULL;
Query *masterQuery = NULL; Query *masterQuery = NULL;
List *masterDependedJobList = NIL; List *masterDependedJobList = NIL;
@ -221,18 +221,18 @@ MultiPhysicalPlanCreate(MultiTreeRoot *multiTree,
masterDependedJobList = list_make1(workerJob); masterDependedJobList = list_make1(workerJob);
masterQuery = BuildJobQuery((MultiNode *) multiTree, masterDependedJobList); masterQuery = BuildJobQuery((MultiNode *) multiTree, masterDependedJobList);
multiPlan = CitusMakeNode(MultiPlan); distributedPlan = CitusMakeNode(DistributedPlan);
multiPlan->workerJob = workerJob; distributedPlan->workerJob = workerJob;
multiPlan->masterQuery = masterQuery; distributedPlan->masterQuery = masterQuery;
multiPlan->routerExecutable = MultiPlanRouterExecutable(multiPlan); distributedPlan->routerExecutable = DistributedPlanRouterExecutable(distributedPlan);
multiPlan->operation = CMD_SELECT; distributedPlan->operation = CMD_SELECT;
return multiPlan; return distributedPlan;
} }
/* /*
* MultiPlanRouterExecutable returns true if the input multiPlan is * DistributedPlanRouterExecutable returns true if the input distributedPlan is
* router executable. * router executable.
* *
* Note that all the multi plans that are created by router planner are * Note that all the multi plans that are created by router planner are
@ -240,10 +240,10 @@ MultiPhysicalPlanCreate(MultiTreeRoot *multiTree,
* for multi plans that are not generated by router planner. * for multi plans that are not generated by router planner.
*/ */
static bool static bool
MultiPlanRouterExecutable(MultiPlan *multiPlan) DistributedPlanRouterExecutable(DistributedPlan *distributedPlan)
{ {
Query *masterQuery = multiPlan->masterQuery; Query *masterQuery = distributedPlan->masterQuery;
Job *job = multiPlan->workerJob; Job *job = distributedPlan->workerJob;
List *workerTaskList = job->taskList; List *workerTaskList = job->taskList;
int taskCount = list_length(workerTaskList); int taskCount = list_length(workerTaskList);
int dependedJobCount = list_length(job->dependedJobList); int dependedJobCount = list_length(job->dependedJobList);

View File

@ -106,7 +106,7 @@ bool EnableRouterExecution = true;
/* planner functions forward declarations */ /* planner functions forward declarations */
static MultiPlan * CreateSingleTaskRouterPlan(Query *originalQuery, static DistributedPlan * CreateSingleTaskRouterPlan(Query *originalQuery,
Query *query, Query *query,
RelationRestrictionContext * RelationRestrictionContext *
restrictionContext); restrictionContext);
@ -158,7 +158,7 @@ static List * MultiShardModifyTaskList(Query *originalQuery, List *relationShard
* SELECT statement. If planning fails either NULL is returned, or * SELECT statement. If planning fails either NULL is returned, or
* ->planningError is set to a description of the failure. * ->planningError is set to a description of the failure.
*/ */
MultiPlan * DistributedPlan *
CreateRouterPlan(Query *originalQuery, Query *query, CreateRouterPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext) RelationRestrictionContext *restrictionContext)
{ {
@ -183,20 +183,20 @@ CreateRouterPlan(Query *originalQuery, Query *query,
* statement. If planning fails ->planningError is set to a description of * statement. If planning fails ->planningError is set to a description of
* the failure. * the failure.
*/ */
MultiPlan * DistributedPlan *
CreateModifyPlan(Query *originalQuery, Query *query, CreateModifyPlan(Query *originalQuery, Query *query,
PlannerRestrictionContext *plannerRestrictionContext) PlannerRestrictionContext *plannerRestrictionContext)
{ {
Job *job = NULL; Job *job = NULL;
MultiPlan *multiPlan = CitusMakeNode(MultiPlan); DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
bool multiShardQuery = false; bool multiShardQuery = false;
multiPlan->operation = query->commandType; distributedPlan->operation = query->commandType;
multiPlan->planningError = ModifyQuerySupported(query, multiShardQuery); distributedPlan->planningError = ModifyQuerySupported(query, multiShardQuery);
if (multiPlan->planningError != NULL) if (distributedPlan->planningError != NULL)
{ {
return multiPlan; return distributedPlan;
} }
if (UpdateOrDeleteQuery(query)) if (UpdateOrDeleteQuery(query))
@ -204,31 +204,32 @@ CreateModifyPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext = RelationRestrictionContext *restrictionContext =
plannerRestrictionContext->relationRestrictionContext; plannerRestrictionContext->relationRestrictionContext;
job = RouterJob(originalQuery, restrictionContext, &multiPlan->planningError); job = RouterJob(originalQuery, restrictionContext,
&distributedPlan->planningError);
} }
else else
{ {
job = RouterInsertJob(originalQuery, query, &multiPlan->planningError); job = RouterInsertJob(originalQuery, query, &distributedPlan->planningError);
} }
if (multiPlan->planningError != NULL) if (distributedPlan->planningError != NULL)
{ {
return multiPlan; return distributedPlan;
} }
ereport(DEBUG2, (errmsg("Creating router plan"))); ereport(DEBUG2, (errmsg("Creating router plan")));
multiPlan->workerJob = job; distributedPlan->workerJob = job;
multiPlan->masterQuery = NULL; distributedPlan->masterQuery = NULL;
multiPlan->routerExecutable = true; distributedPlan->routerExecutable = true;
multiPlan->hasReturning = false; distributedPlan->hasReturning = false;
if (list_length(originalQuery->returningList) > 0) if (list_length(originalQuery->returningList) > 0)
{ {
multiPlan->hasReturning = true; distributedPlan->hasReturning = true;
} }
return multiPlan; return distributedPlan;
} }
@ -239,26 +240,26 @@ CreateModifyPlan(Query *originalQuery, Query *query,
* are router plannable by default. If query is not router plannable then either NULL is * are router plannable by default. If query is not router plannable then either NULL is
* returned, or the returned plan has planningError set to a description of the problem. * returned, or the returned plan has planningError set to a description of the problem.
*/ */
static MultiPlan * static DistributedPlan *
CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext) RelationRestrictionContext *restrictionContext)
{ {
Job *job = NULL; Job *job = NULL;
MultiPlan *multiPlan = CitusMakeNode(MultiPlan); DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
multiPlan->operation = query->commandType; distributedPlan->operation = query->commandType;
/* FIXME: this should probably rather be inlined into CreateRouterPlan */ /* FIXME: this should probably rather be inlined into CreateRouterPlan */
multiPlan->planningError = ErrorIfQueryHasModifyingCTE(query); distributedPlan->planningError = ErrorIfQueryHasModifyingCTE(query);
if (multiPlan->planningError) if (distributedPlan->planningError)
{ {
return multiPlan; return distributedPlan;
} }
/* we cannot have multi shard update/delete query via this code path */ /* we cannot have multi shard update/delete query via this code path */
job = RouterJob(originalQuery, restrictionContext, &multiPlan->planningError); job = RouterJob(originalQuery, restrictionContext, &distributedPlan->planningError);
if (multiPlan->planningError) if (distributedPlan->planningError)
{ {
/* query cannot be handled by this planner */ /* query cannot be handled by this planner */
return NULL; return NULL;
@ -266,12 +267,12 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
ereport(DEBUG2, (errmsg("Creating router plan"))); ereport(DEBUG2, (errmsg("Creating router plan")));
multiPlan->workerJob = job; distributedPlan->workerJob = job;
multiPlan->masterQuery = NULL; distributedPlan->masterQuery = NULL;
multiPlan->routerExecutable = true; distributedPlan->routerExecutable = true;
multiPlan->hasReturning = false; distributedPlan->hasReturning = false;
return multiPlan; return distributedPlan;
} }

View File

@ -10,7 +10,7 @@
*/ */
#include "postgres.h" #include "postgres.h"
#include "distributed/multi_planner.h" #include "distributed/distributed_planner.h"
#include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_planner.h"
#include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_optimizer.h"
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"

View File

@ -54,7 +54,7 @@
#include "catalog/pg_collation.h" #include "catalog/pg_collation.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_planner.h" #include "distributed/distributed_planner.h"
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"

View File

@ -31,7 +31,7 @@
#include "distributed/multi_explain.h" #include "distributed/multi_explain.h"
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
#include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_planner.h" #include "distributed/distributed_planner.h"
#include "distributed/multi_router_executor.h" #include "distributed/multi_router_executor.h"
#include "distributed/multi_router_planner.h" #include "distributed/multi_router_planner.h"
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
@ -189,7 +189,7 @@ _PG_init(void)
RegisterNodes(); RegisterNodes();
/* intercept planner */ /* intercept planner */
planner_hook = multi_planner; planner_hook = distributed_planner;
/* register utility hook */ /* register utility hook */
#if (PG_VERSION_NUM >= 100000) #if (PG_VERSION_NUM >= 100000)

View File

@ -97,9 +97,9 @@ CopyNodeJob(COPYFUNC_ARGS)
void void
CopyNodeMultiPlan(COPYFUNC_ARGS) CopyNodeDistributedPlan(COPYFUNC_ARGS)
{ {
DECLARE_FROM_AND_NEW_NODE(MultiPlan); DECLARE_FROM_AND_NEW_NODE(DistributedPlan);
COPY_SCALAR_FIELD(operation); COPY_SCALAR_FIELD(operation);
COPY_SCALAR_FIELD(hasReturning); COPY_SCALAR_FIELD(hasReturning);

View File

@ -15,7 +15,7 @@
#include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodefuncs.h"
#include "distributed/errormessage.h" #include "distributed/errormessage.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_planner.h" #include "distributed/distributed_planner.h"
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
static const char *CitusNodeTagNamesD[] = { static const char *CitusNodeTagNamesD[] = {
@ -31,7 +31,7 @@ static const char *CitusNodeTagNamesD[] = {
"MultiExtendedOp", "MultiExtendedOp",
"Job", "Job",
"MapMergeJob", "MapMergeJob",
"MultiPlan", "DistributedPlan",
"Task", "Task",
"TaskExecution", "TaskExecution",
"ShardInterval", "ShardInterval",
@ -383,7 +383,7 @@ EqualUnsupportedCitusNode(const struct ExtensibleNode *a,
/* *INDENT-ON* */ /* *INDENT-ON* */
const ExtensibleNodeMethods nodeMethods[] = const ExtensibleNodeMethods nodeMethods[] =
{ {
DEFINE_NODE_METHODS(MultiPlan), DEFINE_NODE_METHODS(DistributedPlan),
DEFINE_NODE_METHODS(Job), DEFINE_NODE_METHODS(Job),
DEFINE_NODE_METHODS(ShardInterval), DEFINE_NODE_METHODS(ShardInterval),
DEFINE_NODE_METHODS(MapMergeJob), DEFINE_NODE_METHODS(MapMergeJob),

View File

@ -25,7 +25,7 @@
#include "distributed/errormessage.h" #include "distributed/errormessage.h"
#include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_planner.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/multi_planner.h" #include "distributed/distributed_planner.h"
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
@ -173,11 +173,11 @@ OutMultiTreeRoot(OUTFUNC_ARGS)
void void
OutMultiPlan(OUTFUNC_ARGS) OutDistributedPlan(OUTFUNC_ARGS)
{ {
WRITE_LOCALS(MultiPlan); WRITE_LOCALS(DistributedPlan);
WRITE_NODE_TYPE("MULTIPLAN"); WRITE_NODE_TYPE("DISTRIBUTEDPLAN");
WRITE_INT_FIELD(operation); WRITE_INT_FIELD(operation);
WRITE_BOOL_FIELD(hasReturning); WRITE_BOOL_FIELD(hasReturning);

View File

@ -15,7 +15,7 @@
#include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodefuncs.h"
#include "distributed/errormessage.h" #include "distributed/errormessage.h"
#include "distributed/multi_planner.h" #include "distributed/distributed_planner.h"
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
#include "nodes/readfuncs.h" #include "nodes/readfuncs.h"
@ -195,9 +195,9 @@ ReadJob(READFUNC_ARGS)
READFUNC_RET READFUNC_RET
ReadMultiPlan(READFUNC_ARGS) ReadDistributedPlan(READFUNC_ARGS)
{ {
READ_LOCALS(MultiPlan); READ_LOCALS(DistributedPlan);
READ_INT_FIELD(operation); READ_INT_FIELD(operation);
READ_BOOL_FIELD(hasReturning); READ_BOOL_FIELD(hasReturning);

View File

@ -22,7 +22,7 @@
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_planner.h" #include "distributed/distributed_planner.h"
#include "distributed/multi_router_executor.h" #include "distributed/multi_router_executor.h"
#include "distributed/relay_utility.h" #include "distributed/relay_utility.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"

View File

@ -16,7 +16,7 @@
#include "catalog/pg_collation.h" #include "catalog/pg_collation.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_planner.h" #include "distributed/distributed_planner.h"
#include "distributed/shard_pruning.h" #include "distributed/shard_pruning.h"
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"

View File

@ -43,7 +43,7 @@ extern void RegisterNodes(void);
ExtensibleNode *source_node ExtensibleNode *source_node
extern READFUNC_RET ReadJob(READFUNC_ARGS); extern READFUNC_RET ReadJob(READFUNC_ARGS);
extern READFUNC_RET ReadMultiPlan(READFUNC_ARGS); extern READFUNC_RET ReadDistributedPlan(READFUNC_ARGS);
extern READFUNC_RET ReadShardInterval(READFUNC_ARGS); extern READFUNC_RET ReadShardInterval(READFUNC_ARGS);
extern READFUNC_RET ReadMapMergeJob(READFUNC_ARGS); extern READFUNC_RET ReadMapMergeJob(READFUNC_ARGS);
extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS); extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS);
@ -56,7 +56,7 @@ extern READFUNC_RET ReadGroupShardPlacement(READFUNC_ARGS);
extern READFUNC_RET ReadUnsupportedCitusNode(READFUNC_ARGS); extern READFUNC_RET ReadUnsupportedCitusNode(READFUNC_ARGS);
extern void OutJob(OUTFUNC_ARGS); extern void OutJob(OUTFUNC_ARGS);
extern void OutMultiPlan(OUTFUNC_ARGS); extern void OutDistributedPlan(OUTFUNC_ARGS);
extern void OutShardInterval(OUTFUNC_ARGS); extern void OutShardInterval(OUTFUNC_ARGS);
extern void OutMapMergeJob(OUTFUNC_ARGS); extern void OutMapMergeJob(OUTFUNC_ARGS);
extern void OutShardPlacement(OUTFUNC_ARGS); extern void OutShardPlacement(OUTFUNC_ARGS);
@ -78,7 +78,7 @@ extern void OutMultiCartesianProduct(OUTFUNC_ARGS);
extern void OutMultiExtendedOp(OUTFUNC_ARGS); extern void OutMultiExtendedOp(OUTFUNC_ARGS);
extern void CopyNodeJob(COPYFUNC_ARGS); extern void CopyNodeJob(COPYFUNC_ARGS);
extern void CopyNodeMultiPlan(COPYFUNC_ARGS); extern void CopyNodeDistributedPlan(COPYFUNC_ARGS);
extern void CopyNodeShardInterval(COPYFUNC_ARGS); extern void CopyNodeShardInterval(COPYFUNC_ARGS);
extern void CopyNodeMapMergeJob(COPYFUNC_ARGS); extern void CopyNodeMapMergeJob(COPYFUNC_ARGS);
extern void CopyNodeShardPlacement(COPYFUNC_ARGS); extern void CopyNodeShardPlacement(COPYFUNC_ARGS);

View File

@ -55,7 +55,7 @@ typedef enum CitusNodeTag
T_MultiExtendedOp, T_MultiExtendedOp,
T_Job, T_Job,
T_MapMergeJob, T_MapMergeJob,
T_MultiPlan, T_DistributedPlan,
T_Task, T_Task,
T_TaskExecution, T_TaskExecution,
T_ShardInterval, T_ShardInterval,

View File

@ -1,14 +1,14 @@
/*------------------------------------------------------------------------- /*-------------------------------------------------------------------------
* *
* multi_planner.h * distributed_planner.h
* General Citus planner code. * General Citus planner code.
* *
* Copyright (c) 2012-2016, Citus Data, Inc. * Copyright (c) 2012-2016, Citus Data, Inc.
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
#ifndef MULTI_PLANNER_H #ifndef DISTRIBUTED_PLANNER_H
#define MULTI_PLANNER_H #define DISTRIBUTED_PLANNER_H
#include "nodes/plannodes.h" #include "nodes/plannodes.h"
#include "nodes/relation.h" #include "nodes/relation.h"
@ -71,9 +71,9 @@ typedef struct RelationShard
} RelationShard; } RelationShard;
extern PlannedStmt * multi_planner(Query *parse, int cursorOptions, extern PlannedStmt * distributed_planner(Query *parse, int cursorOptions,
ParamListInfo boundParams); ParamListInfo boundParams);
extern struct MultiPlan * GetMultiPlan(CustomScan *node); extern struct DistributedPlan * GetDistributedPlan(CustomScan *node);
extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
Index index, RangeTblEntry *rte); Index index, RangeTblEntry *rte);
extern void multi_join_restriction_hook(PlannerInfo *root, extern void multi_join_restriction_hook(PlannerInfo *root,
@ -83,13 +83,13 @@ extern void multi_join_restriction_hook(PlannerInfo *root,
JoinType jointype, JoinType jointype,
JoinPathExtraData *extra); JoinPathExtraData *extra);
extern bool IsModifyCommand(Query *query); extern bool IsModifyCommand(Query *query);
extern bool IsUpdateOrDelete(struct MultiPlan *multiPlan); extern bool IsUpdateOrDelete(struct DistributedPlan *distributedPlan);
extern bool IsModifyMultiPlan(struct MultiPlan *multiPlan); extern bool IsModifyDistributedPlan(struct DistributedPlan *distributedPlan);
extern bool IsMultiTaskPlan(struct MultiPlan *multiPlan); extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan);
extern bool IsMultiShardModifyPlan(struct MultiPlan *multiPlan); extern bool IsMultiShardModifyPlan(struct DistributedPlan *distributedPlan);
extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList); extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList);
extern int GetRTEIdentity(RangeTblEntry *rte); extern int GetRTEIdentity(RangeTblEntry *rte);
#endif /* MULTI_PLANNER_H */ #endif /* DISTRIBUTED_PLANNER_H */

View File

@ -17,7 +17,7 @@
#include "postgres.h" #include "postgres.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/multi_planner.h" #include "distributed/distributed_planner.h"
#include "nodes/execnodes.h" #include "nodes/execnodes.h"
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
#include "nodes/plannodes.h" #include "nodes/plannodes.h"
@ -30,7 +30,7 @@ extern Query * ReorderInsertSelectTargetLists(Query *originalQuery,
RangeTblEntry *subqueryRte); RangeTblEntry *subqueryRte);
extern void CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors, extern void CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors,
struct ExplainState *es); struct ExplainState *es);
extern MultiPlan * CreateInsertSelectPlan(Query *originalQuery, extern DistributedPlan * CreateInsertSelectPlan(Query *originalQuery,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext); plannerRestrictionContext);

View File

@ -21,7 +21,7 @@
typedef struct CitusScanState typedef struct CitusScanState
{ {
CustomScanState customScanState; /* underlying custom scan node */ CustomScanState customScanState; /* underlying custom scan node */
MultiPlan *multiPlan; /* distributed execution plan */ DistributedPlan *distributedPlan; /* distributed execution plan */
MultiExecutorType executorType; /* distributed executor type */ MultiExecutorType executorType; /* distributed executor type */
bool finishedRemoteScan; /* flag to check if remote scan is finished */ bool finishedRemoteScan; /* flag to check if remote scan is finished */
Tuplestorestate *tuplestorestate; /* tuple store to store distributed results */ Tuplestorestate *tuplestorestate; /* tuple store to store distributed results */

View File

@ -20,9 +20,9 @@
/* Function declarations for building local plans on the master node */ /* Function declarations for building local plans on the master node */
struct MultiPlan; struct DistributedPlan;
struct CustomScan; struct CustomScan;
extern PlannedStmt * MasterNodeSelectPlan(struct MultiPlan *multiPlan, extern PlannedStmt * MasterNodeSelectPlan(struct DistributedPlan *distributedPlan,
struct CustomScan *dataScan); struct CustomScan *dataScan);
extern Unique * make_unique_from_sortclauses(Plan *lefttree, List *distinctList); extern Unique * make_unique_from_sortclauses(Plan *lefttree, List *distinctList);

View File

@ -22,7 +22,7 @@
#include "distributed/errormessage.h" #include "distributed/errormessage.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_planner.h"
#include "distributed/multi_planner.h" #include "distributed/distributed_planner.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
#include "utils/array.h" #include "utils/array.h"
@ -217,22 +217,38 @@ typedef struct JoinSequenceNode
/* /*
* MultiPlan * DistributedPlan contains all information necessary to execute a
* distribute query.
*/ */
typedef struct MultiPlan typedef struct DistributedPlan
{ {
CitusNode type; CitusNode type;
/* type of command to execute (SELECT/INSERT/...) */
CmdType operation; CmdType operation;
/* specifies whether a DML command has a RETURNING */
bool hasReturning; bool hasReturning;
/* job tree containing the tasks to be executed on workers */
Job *workerJob; Job *workerJob;
/* local query that merges results from the workers */
Query *masterQuery; Query *masterQuery;
/* a router executable query is executed entirely on a worker */
bool routerExecutable; bool routerExecutable;
/* which relations are accessed by this distributed plan */
List *relationIdList; List *relationIdList;
/* INSERT ... SELECT via coordinator only */ /* SELECT query in an INSERT ... SELECT via the coordinator */
Query *insertSelectSubquery; Query *insertSelectSubquery;
/* target list of an INSERT ... SELECT via the coordinator */
List *insertTargetList; List *insertTargetList;
/* target relation of an INSERT ... SELECT via the coordinator */
Oid targetRelationId; Oid targetRelationId;
/* /*
@ -241,7 +257,7 @@ typedef struct MultiPlan
* or if prepared statement parameters prevented successful planning. * or if prepared statement parameters prevented successful planning.
*/ */
DeferredErrorMessage *planningError; DeferredErrorMessage *planningError;
} MultiPlan; } DistributedPlan;
/* OperatorCacheEntry contains information for each element in OperatorCache */ /* OperatorCacheEntry contains information for each element in OperatorCache */
@ -263,7 +279,7 @@ extern bool EnableUniqueJobIds;
/* Function declarations for building physical plans and constructing queries */ /* Function declarations for building physical plans and constructing queries */
extern MultiPlan * MultiPhysicalPlanCreate(MultiTreeRoot *multiTree, extern DistributedPlan * CreatePhysicalDistributedPlan(MultiTreeRoot *multiTree,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext); plannerRestrictionContext);
extern StringInfo ShardFetchQueryString(uint64 shardId); extern StringInfo ShardFetchQueryString(uint64 shardId);

View File

@ -17,7 +17,7 @@
#include "distributed/errormessage.h" #include "distributed/errormessage.h"
#include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_planner.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/multi_planner.h" #include "distributed/distributed_planner.h"
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
@ -26,9 +26,9 @@
extern bool EnableRouterExecution; extern bool EnableRouterExecution;
extern MultiPlan * CreateRouterPlan(Query *originalQuery, Query *query, extern DistributedPlan * CreateRouterPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext); RelationRestrictionContext *restrictionContext);
extern MultiPlan * CreateModifyPlan(Query *originalQuery, Query *query, extern DistributedPlan * CreateModifyPlan(Query *originalQuery, Query *query,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext); plannerRestrictionContext);
extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery, extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery,

View File

@ -195,7 +195,7 @@ extern void MultiRealTimeExecute(Job *job);
extern void MultiTaskTrackerExecute(Job *job); extern void MultiTaskTrackerExecute(Job *job);
/* Function declarations common to more than one executor */ /* Function declarations common to more than one executor */
extern MultiExecutorType JobExecutorType(MultiPlan *multiPlan); extern MultiExecutorType JobExecutorType(DistributedPlan *distributedPlan);
extern void RemoveJobDirectory(uint64 jobId); extern void RemoveJobDirectory(uint64 jobId);
extern TaskExecution * InitTaskExecution(Task *task, TaskExecStatus initialStatus); extern TaskExecution * InitTaskExecution(Task *task, TaskExecStatus initialStatus);
extern void CleanupTaskExecution(TaskExecution *taskExecution); extern void CleanupTaskExecution(TaskExecution *taskExecution);

View File

@ -12,7 +12,7 @@
#ifndef RELATION_RESTRICTION_EQUIVALENCE_H #ifndef RELATION_RESTRICTION_EQUIVALENCE_H
#define RELATION_RESTRICTION_EQUIVALENCE_H #define RELATION_RESTRICTION_EQUIVALENCE_H
#include "distributed/multi_planner.h" #include "distributed/distributed_planner.h"
extern bool ContainsUnionSubquery(Query *queryTree); extern bool ContainsUnionSubquery(Query *queryTree);