From 6ba3f42d23c78e03d6acafcc3f6d4deaf8228da4 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 16 Nov 2017 15:19:12 +0100 Subject: [PATCH] Rename MultiPlan to DistributedPlan --- .../connection/placement_connection.c | 2 +- .../executor/insert_select_executor.c | 10 +-- .../distributed/executor/multi_executor.c | 36 ++++---- .../executor/multi_router_executor.c | 28 +++---- .../executor/multi_server_executor.c | 16 ++-- .../distributed/executor/multi_utility.c | 2 +- ...{multi_planner.c => distributed_planner.c} | 84 ++++++++++--------- .../planner/insert_select_planner.c | 69 +++++++-------- .../distributed/planner/multi_explain.c | 10 +-- .../planner/multi_logical_planner.c | 32 +++---- .../planner/multi_master_planner.c | 8 +- .../planner/multi_physical_planner.c | 32 +++---- .../planner/multi_router_planner.c | 69 +++++++-------- .../relation_restriction_equivalence.c | 2 +- .../distributed/planner/shard_pruning.c | 2 +- src/backend/distributed/shared_library_init.c | 4 +- .../distributed/utils/citus_copyfuncs.c | 4 +- .../distributed/utils/citus_nodefuncs.c | 6 +- .../distributed/utils/citus_outfuncs.c | 8 +- .../distributed/utils/citus_readfuncs.c | 6 +- src/backend/distributed/utils/resource_lock.c | 2 +- .../distributed/utils/shardinterval_utils.c | 2 +- src/include/distributed/citus_nodefuncs.h | 6 +- src/include/distributed/citus_nodes.h | 2 +- ...{multi_planner.h => distributed_planner.h} | 22 ++--- .../distributed/insert_select_planner.h | 8 +- src/include/distributed/multi_executor.h | 2 +- .../distributed/multi_master_planner.h | 4 +- .../distributed/multi_physical_planner.h | 32 +++++-- .../distributed/multi_router_planner.h | 12 +-- .../distributed/multi_server_executor.h | 2 +- .../relation_restriction_equivalence.h | 2 +- 32 files changed, 273 insertions(+), 253 deletions(-) rename src/backend/distributed/planner/{multi_planner.c => distributed_planner.c} (93%) rename src/include/distributed/{multi_planner.h => distributed_planner.h} (77%) diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index 1d3ab9008..3554180d6 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -17,7 +17,7 @@ #include "distributed/hash_helpers.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" -#include "distributed/multi_planner.h" +#include "distributed/distributed_planner.h" #include "distributed/placement_connection.h" #include "utils/hsearch.h" #include "utils/memutils.h" diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index ff331a682..b71082747 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -16,7 +16,7 @@ #include "distributed/multi_executor.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" -#include "distributed/multi_planner.h" +#include "distributed/distributed_planner.h" #include "distributed/resource_lock.h" #include "distributed/transaction_management.h" #include "executor/executor.h" @@ -55,10 +55,10 @@ CoordinatorInsertSelectExecScan(CustomScanState *node) if (!scanState->finishedRemoteScan) { EState *executorState = scanState->customScanState.ss.ps.state; - MultiPlan *multiPlan = scanState->multiPlan; - Query *selectQuery = multiPlan->insertSelectSubquery; - List *insertTargetList = multiPlan->insertTargetList; - Oid targetRelationId = multiPlan->targetRelationId; + DistributedPlan *distributedPlan = scanState->distributedPlan; + Query *selectQuery = distributedPlan->insertSelectSubquery; + List *insertTargetList = distributedPlan->insertTargetList; + Oid targetRelationId = distributedPlan->targetRelationId; ereport(DEBUG1, (errmsg("Collecting INSERT ... SELECT results on coordinator"))); diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index db45c9c8e..3747be4bf 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -20,7 +20,7 @@ #include "distributed/multi_copy.h" #include "distributed/multi_executor.h" #include "distributed/multi_master_planner.h" -#include "distributed/multi_planner.h" +#include "distributed/distributed_planner.h" #include "distributed/multi_router_executor.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_resowner.h" @@ -115,7 +115,7 @@ RealTimeCreateScan(CustomScan *scan) scanState->executorType = MULTI_EXECUTOR_REAL_TIME; scanState->customScanState.ss.ps.type = T_CustomScanState; - scanState->multiPlan = GetMultiPlan(scan); + scanState->distributedPlan = GetDistributedPlan(scan); scanState->customScanState.methods = &RealTimeCustomExecMethods; @@ -133,7 +133,7 @@ TaskTrackerCreateScan(CustomScan *scan) scanState->executorType = MULTI_EXECUTOR_TASK_TRACKER; scanState->customScanState.ss.ps.type = T_CustomScanState; - scanState->multiPlan = GetMultiPlan(scan); + scanState->distributedPlan = GetDistributedPlan(scan); scanState->customScanState.methods = &TaskTrackerCustomExecMethods; @@ -148,20 +148,20 @@ Node * RouterCreateScan(CustomScan *scan) { CitusScanState *scanState = palloc0(sizeof(CitusScanState)); - MultiPlan *multiPlan = NULL; + DistributedPlan *distributedPlan = NULL; Job *workerJob = NULL; List *taskList = NIL; bool isModificationQuery = false; scanState->executorType = MULTI_EXECUTOR_ROUTER; scanState->customScanState.ss.ps.type = T_CustomScanState; - scanState->multiPlan = GetMultiPlan(scan); + scanState->distributedPlan = GetDistributedPlan(scan); - multiPlan = scanState->multiPlan; - workerJob = multiPlan->workerJob; + distributedPlan = scanState->distributedPlan; + workerJob = distributedPlan->workerJob; taskList = workerJob->taskList; - isModificationQuery = IsModifyMultiPlan(multiPlan); + isModificationQuery = IsModifyDistributedPlan(distributedPlan); /* check whether query has at most one shard */ if (list_length(taskList) <= 1) @@ -180,7 +180,7 @@ RouterCreateScan(CustomScan *scan) Assert(isModificationQuery); if (IsMultiRowInsert(workerJob->jobQuery) || - (IsUpdateOrDelete(multiPlan) && + (IsUpdateOrDelete(distributedPlan) && MultiShardConnectionType == SEQUENTIAL_CONNECTION)) { /* @@ -211,7 +211,7 @@ CoordinatorInsertSelectCreateScan(CustomScan *scan) scanState->executorType = MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT; scanState->customScanState.ss.ps.type = T_CustomScanState; - scanState->multiPlan = GetMultiPlan(scan); + scanState->distributedPlan = GetDistributedPlan(scan); scanState->customScanState.methods = &CoordinatorInsertSelectCustomExecMethods; @@ -231,10 +231,10 @@ CoordinatorInsertSelectCreateScan(CustomScan *scan) Node * DelayedErrorCreateScan(CustomScan *scan) { - MultiPlan *multiPlan = GetMultiPlan(scan); + DistributedPlan *distributedPlan = GetDistributedPlan(scan); /* raise the deferred error */ - RaiseDeferredError(multiPlan->planningError, ERROR); + RaiseDeferredError(distributedPlan->planningError, ERROR); return NULL; } @@ -264,11 +264,11 @@ RealTimeExecScan(CustomScanState *node) if (!scanState->finishedRemoteScan) { - MultiPlan *multiPlan = scanState->multiPlan; - Job *workerJob = multiPlan->workerJob; + DistributedPlan *distributedPlan = scanState->distributedPlan; + Job *workerJob = distributedPlan->workerJob; /* we are taking locks on partitions of partitioned tables */ - LockPartitionsInRelationList(multiPlan->relationIdList, AccessShareLock); + LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock); PrepareMasterJobDirectory(workerJob); MultiRealTimeExecute(workerJob); @@ -464,11 +464,11 @@ TaskTrackerExecScan(CustomScanState *node) if (!scanState->finishedRemoteScan) { - MultiPlan *multiPlan = scanState->multiPlan; - Job *workerJob = multiPlan->workerJob; + DistributedPlan *distributedPlan = scanState->distributedPlan; + Job *workerJob = distributedPlan->workerJob; /* we are taking locks on partitions of partitioned tables */ - LockPartitionsInRelationList(multiPlan->relationIdList, AccessShareLock); + LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock); PrepareMasterJobDirectory(workerJob); MultiTaskTrackerExecute(workerJob); diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 4acda6ff8..438307e1f 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -36,7 +36,7 @@ #include "distributed/multi_executor.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" -#include "distributed/multi_planner.h" +#include "distributed/distributed_planner.h" #include "distributed/multi_router_executor.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_shard_transaction.h" @@ -394,8 +394,8 @@ void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags) { CitusScanState *scanState = (CitusScanState *) node; - MultiPlan *multiPlan = scanState->multiPlan; - Job *workerJob = multiPlan->workerJob; + DistributedPlan *distributedPlan = scanState->distributedPlan; + Job *workerJob = distributedPlan->workerJob; Query *jobQuery = workerJob->jobQuery; List *taskList = workerJob->taskList; bool deferredPruning = workerJob->deferredPruning; @@ -440,7 +440,7 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags) * We are taking locks on partitions of partitioned tables. These locks are * necessary for locking tables that appear in the SELECT part of the query. */ - LockPartitionsInRelationList(multiPlan->relationIdList, AccessShareLock); + LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock); /* modify tasks are always assigned using first-replica policy */ workerJob->taskList = FirstReplicaAssignTaskList(taskList); @@ -459,9 +459,9 @@ RouterSequentialModifyExecScan(CustomScanState *node) if (!scanState->finishedRemoteScan) { - MultiPlan *multiPlan = scanState->multiPlan; - bool hasReturning = multiPlan->hasReturning; - Job *workerJob = multiPlan->workerJob; + DistributedPlan *distributedPlan = scanState->distributedPlan; + bool hasReturning = distributedPlan->hasReturning; + Job *workerJob = distributedPlan->workerJob; List *taskList = workerJob->taskList; ListCell *taskCell = NULL; bool multipleTasks = list_length(taskList) > 1; @@ -506,10 +506,10 @@ RouterMultiModifyExecScan(CustomScanState *node) if (!scanState->finishedRemoteScan) { - MultiPlan *multiPlan = scanState->multiPlan; - Job *workerJob = multiPlan->workerJob; + DistributedPlan *distributedPlan = scanState->distributedPlan; + Job *workerJob = distributedPlan->workerJob; List *taskList = workerJob->taskList; - bool hasReturning = multiPlan->hasReturning; + bool hasReturning = distributedPlan->hasReturning; bool isModificationQuery = true; ExecuteMultipleTasks(scanState, taskList, isModificationQuery, hasReturning); @@ -536,12 +536,12 @@ RouterSelectExecScan(CustomScanState *node) if (!scanState->finishedRemoteScan) { - MultiPlan *multiPlan = scanState->multiPlan; - Job *workerJob = multiPlan->workerJob; + DistributedPlan *distributedPlan = scanState->distributedPlan; + Job *workerJob = distributedPlan->workerJob; List *taskList = workerJob->taskList; /* we are taking locks on partitions of partitioned tables */ - LockPartitionsInRelationList(multiPlan->relationIdList, AccessShareLock); + LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock); if (list_length(taskList) > 0) { @@ -693,7 +693,7 @@ static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool multipleTasks, bool expectResults) { - CmdType operation = scanState->multiPlan->operation; + CmdType operation = scanState->distributedPlan->operation; EState *executorState = scanState->customScanState.ss.ps.state; ParamListInfo paramListInfo = executorState->es_param_list_info; List *taskPlacementList = task->taskPlacementList; diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index 2b77ef195..38b9f4fcd 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -32,22 +32,22 @@ bool BinaryMasterCopyFormat = false; /* copy data from workers in binary format /* - * JobExecutorType selects the executor type for the given multiPlan using the task - * executor type config value. The function then checks if the given multiPlan needs + * JobExecutorType selects the executor type for the given distributedPlan using the task + * executor type config value. The function then checks if the given distributedPlan needs * more resources than those provided to it by other config values, and issues * warnings accordingly. If the selected executor type cannot execute the given - * multiPlan, the function errors out. + * distributedPlan, the function errors out. */ MultiExecutorType -JobExecutorType(MultiPlan *multiPlan) +JobExecutorType(DistributedPlan *distributedPlan) { - Job *job = multiPlan->workerJob; + Job *job = distributedPlan->workerJob; List *workerNodeList = NIL; int workerNodeCount = 0; int taskCount = 0; double tasksPerNode = 0.; MultiExecutorType executorType = TaskExecutorType; - bool routerExecutablePlan = multiPlan->routerExecutable; + bool routerExecutablePlan = distributedPlan->routerExecutable; /* check if can switch to router executor */ if (routerExecutablePlan) @@ -56,12 +56,12 @@ JobExecutorType(MultiPlan *multiPlan) return MULTI_EXECUTOR_ROUTER; } - if (multiPlan->insertSelectSubquery != NULL) + if (distributedPlan->insertSelectSubquery != NULL) { return MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT; } - Assert(multiPlan->operation == CMD_SELECT); + Assert(distributedPlan->operation == CMD_SELECT); workerNodeList = ActiveReadableNodeList(); workerNodeCount = list_length(workerNodeList); diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 5604bba07..ab6e5ac83 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -45,7 +45,7 @@ #include "distributed/multi_copy.h" #include "distributed/multi_join_order.h" #include "distributed/multi_partitioning_utils.h" -#include "distributed/multi_planner.h" +#include "distributed/distributed_planner.h" #include "distributed/multi_router_executor.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_shard_transaction.h" diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/distributed_planner.c similarity index 93% rename from src/backend/distributed/planner/multi_planner.c rename to src/backend/distributed/planner/distributed_planner.c index 52681bcc3..43c552684 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -1,6 +1,6 @@ /*------------------------------------------------------------------------- * - * multi_planner.c + * distributed_planner.c * General Citus planner code. * * Copyright (c) 2012-2016, Citus Data, Inc. @@ -19,7 +19,7 @@ #include "distributed/insert_select_planner.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" -#include "distributed/multi_planner.h" +#include "distributed/distributed_planner.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_partitioning_utils.h" @@ -74,8 +74,10 @@ static PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, Query *origin static void AdjustParseTree(Query *parse, bool assignRTEIdentities, bool setPartitionedTablesInherited); static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier); -static PlannedStmt * FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan); -static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan, +static PlannedStmt * FinalizePlan(PlannedStmt *localPlan, + DistributedPlan *distributedPlan); +static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan, + DistributedPlan *distributedPlan, CustomScan *customScan); static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan); static void CheckNodeIsDumpable(Node *node); @@ -89,7 +91,7 @@ static bool HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boun /* Distributed planner hook */ PlannedStmt * -multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) +distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) { PlannedStmt *result = NULL; bool needsDistributedPlanning = NeedsDistributedPlanning(parse); @@ -289,9 +291,9 @@ IsModifyCommand(Query *query) * multi shard update or delete query. */ bool -IsMultiShardModifyPlan(MultiPlan *multiPlan) +IsMultiShardModifyPlan(DistributedPlan *distributedPlan) { - if (IsUpdateOrDelete(multiPlan) && IsMultiTaskPlan(multiPlan)) + if (IsUpdateOrDelete(distributedPlan) && IsMultiTaskPlan(distributedPlan)) { return true; } @@ -304,9 +306,9 @@ IsMultiShardModifyPlan(MultiPlan *multiPlan) * IsMultiTaskPlan returns true if job contains multiple tasks. */ bool -IsMultiTaskPlan(MultiPlan *multiPlan) +IsMultiTaskPlan(DistributedPlan *distributedPlan) { - Job *workerJob = multiPlan->workerJob; + Job *workerJob = distributedPlan->workerJob; if (workerJob != NULL && list_length(workerJob->taskList) > 1) { @@ -321,9 +323,9 @@ IsMultiTaskPlan(MultiPlan *multiPlan) * IsUpdateOrDelete returns true if the query performs update or delete. */ bool -IsUpdateOrDelete(MultiPlan *multiPlan) +IsUpdateOrDelete(DistributedPlan *distributedPlan) { - CmdType commandType = multiPlan->operation; + CmdType commandType = distributedPlan->operation; if (commandType == CMD_UPDATE || commandType == CMD_DELETE) { @@ -335,21 +337,21 @@ IsUpdateOrDelete(MultiPlan *multiPlan) /* - * IsModifyMultiPlan returns true if the multi plan performs modifications, + * IsModifyDistributedPlan returns true if the multi plan performs modifications, * false otherwise. */ bool -IsModifyMultiPlan(MultiPlan *multiPlan) +IsModifyDistributedPlan(DistributedPlan *distributedPlan) { - bool isModifyMultiPlan = false; - CmdType operation = multiPlan->operation; + bool isModifyDistributedPlan = false; + CmdType operation = distributedPlan->operation; if (operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE) { - isModifyMultiPlan = true; + isModifyDistributedPlan = true; } - return isModifyMultiPlan; + return isModifyDistributedPlan; } @@ -362,7 +364,7 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query ParamListInfo boundParams, PlannerRestrictionContext *plannerRestrictionContext) { - MultiPlan *distributedPlan = NULL; + DistributedPlan *distributedPlan = NULL; PlannedStmt *resultPlan = NULL; bool hasUnresolvedParams = false; @@ -435,8 +437,8 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query CheckNodeIsDumpable((Node *) logicalPlan); /* Create the physical plan */ - distributedPlan = MultiPhysicalPlanCreate(logicalPlan, - plannerRestrictionContext); + distributedPlan = CreatePhysicalDistributedPlan(logicalPlan, + plannerRestrictionContext); /* distributed plan currently should always succeed or error out */ Assert(distributedPlan && distributedPlan->planningError == NULL); @@ -456,7 +458,7 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query { /* currently always should have a more specific error otherwise */ Assert(hasUnresolvedParams); - distributedPlan = CitusMakeNode(MultiPlan); + distributedPlan = CitusMakeNode(DistributedPlan); distributedPlan->planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "could not create distributed plan", @@ -505,31 +507,31 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query /* - * GetMultiPlan returns the associated MultiPlan for a CustomScan. + * GetDistributedPlan returns the associated DistributedPlan for a CustomScan. */ -MultiPlan * -GetMultiPlan(CustomScan *customScan) +DistributedPlan * +GetDistributedPlan(CustomScan *customScan) { Node *node = NULL; - MultiPlan *multiPlan = NULL; + DistributedPlan *distributedPlan = NULL; Assert(list_length(customScan->custom_private) == 1); node = (Node *) linitial(customScan->custom_private); - Assert(CitusIsA(node, MultiPlan)); + Assert(CitusIsA(node, DistributedPlan)); node = CheckNodeCopyAndSerialization(node); /* * When using prepared statements the same plan gets reused across * multiple statements and transactions. We make several modifications - * to the MultiPlan during execution such as assigning task placements + * to the DistributedPlan during execution such as assigning task placements * and evaluating functions and parameters. These changes should not * persist, so we always work on a copy. */ - multiPlan = (MultiPlan *) copyObject(node); + distributedPlan = (DistributedPlan *) copyObject(node); - return multiPlan; + return distributedPlan; } @@ -538,16 +540,16 @@ GetMultiPlan(CustomScan *customScan) * which can be run by the PostgreSQL executor. */ static PlannedStmt * -FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan) +FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan) { PlannedStmt *finalPlan = NULL; CustomScan *customScan = makeNode(CustomScan); - Node *multiPlanData = NULL; + Node *distributedPlanData = NULL; MultiExecutorType executorType = MULTI_EXECUTOR_INVALID_FIRST; - if (!multiPlan->planningError) + if (!distributedPlan->planningError) { - executorType = JobExecutorType(multiPlan); + executorType = JobExecutorType(distributedPlan); } switch (executorType) @@ -583,7 +585,7 @@ FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan) } } - if (IsMultiTaskPlan(multiPlan)) + if (IsMultiTaskPlan(distributedPlan)) { /* if it is not a single task executable plan, inform user according to the log level */ if (MultiTaskQueryLogLevel != MULTI_TASK_QUERY_INFO_OFF) @@ -597,16 +599,16 @@ FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan) } } - multiPlan->relationIdList = localPlan->relationOids; + distributedPlan->relationIdList = localPlan->relationOids; - multiPlanData = (Node *) multiPlan; + distributedPlanData = (Node *) distributedPlan; - customScan->custom_private = list_make1(multiPlanData); + customScan->custom_private = list_make1(distributedPlanData); customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN; - if (multiPlan->masterQuery) + if (distributedPlan->masterQuery) { - finalPlan = FinalizeNonRouterPlan(localPlan, multiPlan, customScan); + finalPlan = FinalizeNonRouterPlan(localPlan, distributedPlan, customScan); } else { @@ -623,12 +625,12 @@ FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan) * and task-tracker executors. */ static PlannedStmt * -FinalizeNonRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan, +FinalizeNonRouterPlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan, CustomScan *customScan) { PlannedStmt *finalPlan = NULL; - finalPlan = MasterNodeSelectPlan(multiPlan, customScan); + finalPlan = MasterNodeSelectPlan(distributedPlan, customScan); finalPlan->queryId = localPlan->queryId; finalPlan->utilityStmt = localPlan->utilityStmt; diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 9f61a6d40..21e4d4f01 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -38,9 +38,9 @@ #include "utils/lsyscache.h" -static MultiPlan * CreateDistributedInsertSelectPlan(Query *originalQuery, - PlannerRestrictionContext * - plannerRestrictionContext); +static DistributedPlan * CreateDistributedInsertSelectPlan(Query *originalQuery, + PlannerRestrictionContext * + plannerRestrictionContext); static bool SafeToPushDownSubquery(PlannerRestrictionContext *plannerRestrictionContext, Query *originalQuery); static Task * RouterModifyTaskForShardInterval(Query *originalQuery, @@ -61,7 +61,7 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query, subqueryRte, Oid * selectPartitionColumnTableId); -static MultiPlan * CreateCoordinatorInsertSelectPlan(Query *parse); +static DistributedPlan * CreateCoordinatorInsertSelectPlan(Query *parse); static DeferredErrorMessage * CoordinatorInsertSelectSupported(Query *insertSelectQuery); static Query * WrapSubquery(Query *subquery); static bool CheckInsertSelectQuery(Query *query); @@ -172,11 +172,11 @@ CheckInsertSelectQuery(Query *query) * command to the workers and if that is not possible it creates a * plan for evaluating the SELECT on the coordinator. */ -MultiPlan * +DistributedPlan * CreateInsertSelectPlan(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionContext) { - MultiPlan *distributedPlan = NULL; + DistributedPlan *distributedPlan = NULL; distributedPlan = CreateDistributedInsertSelectPlan(originalQuery, plannerRestrictionContext); @@ -194,12 +194,12 @@ CreateInsertSelectPlan(Query *originalQuery, /* - * CreateDistributedInsertSelectPlan Creates a MultiPlan for distributed + * CreateDistributedInsertSelectPlan Creates a DistributedPlan for distributed * INSERT ... SELECT queries which could consists of multiple tasks. * - * The function never returns NULL, it errors out if cannot create the MultiPlan. + * The function never returns NULL, it errors out if cannot create the DistributedPlan. */ -static MultiPlan * +static DistributedPlan * CreateDistributedInsertSelectPlan(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionContext) { @@ -208,7 +208,7 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, uint32 taskIdIndex = 1; /* 0 is reserved for invalid taskId */ Job *workerJob = NULL; uint64 jobId = INVALID_JOB_ID; - MultiPlan *multiPlan = CitusMakeNode(MultiPlan); + DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(originalQuery); RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery); Oid targetRelationId = insertRte->relid; @@ -219,18 +219,19 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, bool allReferenceTables = relationRestrictionContext->allReferenceTables; bool safeToPushDownSubquery = false; - multiPlan->operation = originalQuery->commandType; + distributedPlan->operation = originalQuery->commandType; /* * Error semantics for INSERT ... SELECT queries are different than regular * modify queries. Thus, handle separately. */ - multiPlan->planningError = DistributedInsertSelectSupported(originalQuery, insertRte, - subqueryRte, - allReferenceTables); - if (multiPlan->planningError) + distributedPlan->planningError = DistributedInsertSelectSupported(originalQuery, + insertRte, + subqueryRte, + allReferenceTables); + if (distributedPlan->planningError) { - return multiPlan; + return distributedPlan; } safeToPushDownSubquery = SafeToPushDownSubquery(plannerRestrictionContext, @@ -238,7 +239,7 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, /* * Plan select query for each shard in the target table. Do so by replacing the - * partitioning qual parameter added in multi_planner() using the current shard's + * partitioning qual parameter added in distributed_planner() using the current shard's * actual boundary values. Also, add the current shard's boundary values to the * top level subquery to ensure that even if the partitioning qual is not distributed * to all the tables, we never run the queries on the shards that don't match with @@ -277,17 +278,17 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, workerJob->requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery); /* and finally the multi plan */ - multiPlan->workerJob = workerJob; - multiPlan->masterQuery = NULL; - multiPlan->routerExecutable = true; - multiPlan->hasReturning = false; + distributedPlan->workerJob = workerJob; + distributedPlan->masterQuery = NULL; + distributedPlan->routerExecutable = true; + distributedPlan->hasReturning = false; if (list_length(originalQuery->returningList) > 0) { - multiPlan->hasReturning = true; + distributedPlan->hasReturning = true; } - return multiPlan; + return distributedPlan; } @@ -423,7 +424,7 @@ SafeToPushDownSubquery(PlannerRestrictionContext *plannerRestrictionContext, /* * RouterModifyTaskForShardInterval creates a modify task by - * replacing the partitioning qual parameter added in multi_planner() + * replacing the partitioning qual parameter added in distributed_planner() * with the shardInterval's boundary value. Then perform the normal * shard pruning on the subquery. Finally, checks if the target shardInterval * has exactly same placements with the select task's available anchor @@ -1134,7 +1135,7 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte, * CreatteCoordinatorInsertSelectPlan creates a query plan for a SELECT into a * distributed table. The query plan can also be executed on a worker in MX. */ -static MultiPlan * +static DistributedPlan * CreateCoordinatorInsertSelectPlan(Query *parse) { Query *insertSelectQuery = copyObject(parse); @@ -1144,15 +1145,15 @@ CreateCoordinatorInsertSelectPlan(Query *parse) RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(insertSelectQuery); Oid targetRelationId = insertRte->relid; - MultiPlan *multiPlan = CitusMakeNode(MultiPlan); - multiPlan->operation = CMD_INSERT; + DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); + distributedPlan->operation = CMD_INSERT; - multiPlan->planningError = + distributedPlan->planningError = CoordinatorInsertSelectSupported(insertSelectQuery); - if (multiPlan->planningError != NULL) + if (distributedPlan->planningError != NULL) { - return multiPlan; + return distributedPlan; } selectQuery = selectRte->subquery; @@ -1183,11 +1184,11 @@ CreateCoordinatorInsertSelectPlan(Query *parse) ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte); - multiPlan->insertSelectSubquery = selectQuery; - multiPlan->insertTargetList = insertSelectQuery->targetList; - multiPlan->targetRelationId = targetRelationId; + distributedPlan->insertSelectSubquery = selectQuery; + distributedPlan->insertTargetList = insertSelectQuery->targetList; + distributedPlan->targetRelationId = targetRelationId; - return multiPlan; + return distributedPlan; } diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index c02f10a6b..c5bc8d49f 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -30,7 +30,7 @@ #include "distributed/multi_logical_planner.h" #include "distributed/multi_master_planner.h" #include "distributed/multi_physical_planner.h" -#include "distributed/multi_planner.h" +#include "distributed/distributed_planner.h" #include "distributed/multi_server_executor.h" #include "distributed/remote_commands.h" #include "distributed/placement_connection.h" @@ -112,7 +112,7 @@ void CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es) { CitusScanState *scanState = (CitusScanState *) node; - MultiPlan *multiPlan = scanState->multiPlan; + DistributedPlan *distributedPlan = scanState->distributedPlan; if (!ExplainDistributedQueries) { @@ -124,7 +124,7 @@ CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es ExplainOpenGroup("Distributed Query", "Distributed Query", true, es); - ExplainJob(multiPlan->workerJob, es); + ExplainJob(distributedPlan->workerJob, es); ExplainCloseGroup("Distributed Query", "Distributed Query", true, es); } @@ -140,8 +140,8 @@ CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es) { CitusScanState *scanState = (CitusScanState *) node; - MultiPlan *multiPlan = scanState->multiPlan; - Query *query = multiPlan->insertSelectSubquery; + DistributedPlan *distributedPlan = scanState->distributedPlan; + Query *query = distributedPlan->insertSelectSubquery; IntoClause *into = NULL; ParamListInfo params = NULL; char *queryString = NULL; diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index db9aea8be..36dbe4f8f 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -103,7 +103,7 @@ static bool AllTargetExpressionsAreColumnReferences(List *targetEntryList); static bool RangeTableListContainsOnlyReferenceTables(List *rangeTableList); static FieldSelect * CompositeFieldRecursive(Expr *expression, Query *query); static bool FullCompositeFieldList(List *compositeFieldList); -static MultiNode * MultiPlanTree(Query *queryTree); +static MultiNode * MultiNodeTree(Query *queryTree); static void ErrorIfQueryNotSupported(Query *queryTree); static DeferredErrorMessage * DeferredErrorIfUnsupportedRecurringTuplesJoin( PlannerRestrictionContext *plannerRestrictionContext); @@ -162,13 +162,13 @@ static MultiNode * ApplyCartesianProduct(MultiNode *leftNode, MultiNode *rightNo * functions will be removed with upcoming subqery changes. */ static Node * ResolveExternalParams(Node *inputNode, ParamListInfo boundParams); -static MultiNode * MultiSubqueryPlanTree(Query *originalQuery, +static MultiNode * SubqueryMultiNodeTree(Query *originalQuery, Query *queryTree, PlannerRestrictionContext * plannerRestrictionContext); static List * SublinkList(Query *originalQuery); static bool ExtractSublinkWalker(Node *node, List **sublinkList); -static MultiNode * SubqueryPushdownMultiPlanTree(Query *queryTree); +static MultiNode * SubqueryPushdownMultiNodeTree(Query *queryTree); static List * CreateSubqueryTargetEntryList(List *columnList); static void UpdateVarMappingsForExtendedOpNode(List *columnList, @@ -203,24 +203,24 @@ MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree, /* * We check the existence of subqueries in FROM clause on the modified query - * given that if postgres already flattened the subqueries, MultiPlanTree() + * given that if postgres already flattened the subqueries, MultiNodeTree() * can plan corresponding distributed plan. * * We also check the existence of subqueries in WHERE clause. Note that * this check needs to be done on the original query given that * standard_planner() may replace the sublinks with anti/semi joins and - * MultiPlanTree() cannot plan such queries. + * MultiNodeTree() cannot plan such queries. */ if (SubqueryEntryList(queryTree) != NIL || SublinkList(originalQuery) != NIL) { originalQuery = (Query *) ResolveExternalParams((Node *) originalQuery, boundParams); - multiQueryNode = MultiSubqueryPlanTree(originalQuery, queryTree, + multiQueryNode = SubqueryMultiNodeTree(originalQuery, queryTree, plannerRestrictionContext); } else { - multiQueryNode = MultiPlanTree(queryTree); + multiQueryNode = MultiNodeTree(queryTree); } /* add a root node to serve as the permanent handle to the tree */ @@ -376,7 +376,7 @@ ExtractSublinkWalker(Node *node, List **sublinkList) /* - * MultiSubqueryPlanTree gets the query objects and returns logical plan + * SubqueryMultiNodeTree gets the query objects and returns logical plan * for subqueries. * * We currently have two different code paths for creating logic plan for subqueries: @@ -395,7 +395,7 @@ ExtractSublinkWalker(Node *node, List **sublinkList) * - If found errors, throw it */ static MultiNode * -MultiSubqueryPlanTree(Query *originalQuery, Query *queryTree, +SubqueryMultiNodeTree(Query *originalQuery, Query *queryTree, PlannerRestrictionContext *plannerRestrictionContext) { MultiNode *multiQueryNode = NULL; @@ -416,7 +416,7 @@ MultiSubqueryPlanTree(Query *originalQuery, Query *queryTree, plannerRestrictionContext); if (!subqueryPushdownError) { - multiQueryNode = SubqueryPushdownMultiPlanTree(originalQuery); + multiQueryNode = SubqueryPushdownMultiNodeTree(originalQuery); } else if (subqueryPushdownError) { @@ -450,7 +450,7 @@ MultiSubqueryPlanTree(Query *originalQuery, Query *queryTree, } /* all checks has passed, safe to create the multi plan */ - multiQueryNode = MultiPlanTree(queryTree); + multiQueryNode = MultiNodeTree(queryTree); } Assert(multiQueryNode != NULL); @@ -1591,7 +1591,7 @@ SubqueryEntryList(Query *queryTree) /* - * MultiPlanTree takes in a parsed query tree and uses that tree to construct a + * MultiNodeTree takes in a parsed query tree and uses that tree to construct a * logical plan. This plan is based on multi-relational algebra. This function * creates the logical plan in several steps. * @@ -1609,7 +1609,7 @@ SubqueryEntryList(Query *queryTree) * group, and limit nodes if they appear in the original query tree. */ static MultiNode * -MultiPlanTree(Query *queryTree) +MultiNodeTree(Query *queryTree) { List *rangeTableList = queryTree->rtable; List *targetEntryList = queryTree->targetList; @@ -1686,7 +1686,7 @@ MultiPlanTree(Query *queryTree) } /* recursively create child nested multitree */ - subqueryExtendedNode = MultiPlanTree(subqueryTree); + subqueryExtendedNode = MultiNodeTree(subqueryTree); SetChild((MultiUnaryNode *) subqueryCollectNode, (MultiNode *) subqueryNode); SetChild((MultiUnaryNode *) subqueryNode, subqueryExtendedNode); @@ -3708,7 +3708,7 @@ ApplyCartesianProduct(MultiNode *leftNode, MultiNode *rightNode, /* - * SubqueryPushdownMultiTree creates logical plan for subquery pushdown logic. + * SubqueryPushdownMultiNodeTree creates logical plan for subquery pushdown logic. * Note that this logic will be changed in next iterations, so we decoupled it * from other parts of code although it causes some code duplication. * @@ -3719,7 +3719,7 @@ ApplyCartesianProduct(MultiNode *leftNode, MultiNode *rightNode, * down to workers without invoking join order planner. */ static MultiNode * -SubqueryPushdownMultiPlanTree(Query *queryTree) +SubqueryPushdownMultiNodeTree(Query *queryTree) { List *targetEntryList = queryTree->targetList; List *qualifierList = NIL; diff --git a/src/backend/distributed/planner/multi_master_planner.c b/src/backend/distributed/planner/multi_master_planner.c index 2d7cb672c..79c8016de 100644 --- a/src/backend/distributed/planner/multi_master_planner.c +++ b/src/backend/distributed/planner/multi_master_planner.c @@ -15,7 +15,7 @@ #include "distributed/multi_master_planner.h" #include "distributed/multi_physical_planner.h" -#include "distributed/multi_planner.h" +#include "distributed/distributed_planner.h" #include "distributed/multi_server_executor.h" #include "distributed/worker_protocol.h" #include "nodes/makefuncs.h" @@ -47,12 +47,12 @@ static Plan * BuildDistinctPlan(Query *masterQuery, Plan *subPlan); * filled into the tuple store inside provided custom scan. */ PlannedStmt * -MasterNodeSelectPlan(MultiPlan *multiPlan, CustomScan *remoteScan) +MasterNodeSelectPlan(DistributedPlan *distributedPlan, CustomScan *remoteScan) { - Query *masterQuery = multiPlan->masterQuery; + Query *masterQuery = distributedPlan->masterQuery; PlannedStmt *masterSelectPlan = NULL; - Job *workerJob = multiPlan->workerJob; + Job *workerJob = distributedPlan->workerJob; List *workerTargetList = workerJob->jobQuery->targetList; List *masterTargetList = MasterTargetList(workerTargetList); diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 4fc20931a..22ad67953 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -121,7 +121,7 @@ static ArrayType * SplitPointObject(ShardInterval **shardIntervalArray, uint32 shardIntervalCount); /* Local functions forward declarations for task list creation and helper functions */ -static bool MultiPlanRouterExecutable(MultiPlan *multiPlan); +static bool DistributedPlanRouterExecutable(DistributedPlan *distributedPlan); static Job * BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestrictionContext); static List * SubquerySqlTaskList(Job *job, @@ -198,15 +198,15 @@ static uint32 FinalTargetEntryCount(List *targetEntryList); /* - * MultiPhysicalPlanCreate is the entry point for physical plan generation. The + * CreatePhysicalDistributedPlan is the entry point for physical plan generation. The * function builds the physical plan; this plan includes the list of tasks to be * executed on worker nodes, and the final query to run on the master node. */ -MultiPlan * -MultiPhysicalPlanCreate(MultiTreeRoot *multiTree, - PlannerRestrictionContext *plannerRestrictionContext) +DistributedPlan * +CreatePhysicalDistributedPlan(MultiTreeRoot *multiTree, + PlannerRestrictionContext *plannerRestrictionContext) { - MultiPlan *multiPlan = NULL; + DistributedPlan *distributedPlan = NULL; Job *workerJob = NULL; Query *masterQuery = NULL; List *masterDependedJobList = NIL; @@ -221,18 +221,18 @@ MultiPhysicalPlanCreate(MultiTreeRoot *multiTree, masterDependedJobList = list_make1(workerJob); masterQuery = BuildJobQuery((MultiNode *) multiTree, masterDependedJobList); - multiPlan = CitusMakeNode(MultiPlan); - multiPlan->workerJob = workerJob; - multiPlan->masterQuery = masterQuery; - multiPlan->routerExecutable = MultiPlanRouterExecutable(multiPlan); - multiPlan->operation = CMD_SELECT; + distributedPlan = CitusMakeNode(DistributedPlan); + distributedPlan->workerJob = workerJob; + distributedPlan->masterQuery = masterQuery; + distributedPlan->routerExecutable = DistributedPlanRouterExecutable(distributedPlan); + distributedPlan->operation = CMD_SELECT; - return multiPlan; + return distributedPlan; } /* - * MultiPlanRouterExecutable returns true if the input multiPlan is + * DistributedPlanRouterExecutable returns true if the input distributedPlan is * router executable. * * Note that all the multi plans that are created by router planner are @@ -240,10 +240,10 @@ MultiPhysicalPlanCreate(MultiTreeRoot *multiTree, * for multi plans that are not generated by router planner. */ static bool -MultiPlanRouterExecutable(MultiPlan *multiPlan) +DistributedPlanRouterExecutable(DistributedPlan *distributedPlan) { - Query *masterQuery = multiPlan->masterQuery; - Job *job = multiPlan->workerJob; + Query *masterQuery = distributedPlan->masterQuery; + Job *job = distributedPlan->workerJob; List *workerTaskList = job->taskList; int taskCount = list_length(workerTaskList); int dependedJobCount = list_length(job->dependedJobList); diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index a61627527..743bc98d7 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -106,10 +106,10 @@ bool EnableRouterExecution = true; /* planner functions forward declarations */ -static MultiPlan * CreateSingleTaskRouterPlan(Query *originalQuery, - Query *query, - RelationRestrictionContext * - restrictionContext); +static DistributedPlan * CreateSingleTaskRouterPlan(Query *originalQuery, + Query *query, + RelationRestrictionContext * + restrictionContext); static bool MasterIrreducibleExpression(Node *expression, bool *varArgument, bool *badCoalesce); static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state); @@ -158,7 +158,7 @@ static List * MultiShardModifyTaskList(Query *originalQuery, List *relationShard * SELECT statement. If planning fails either NULL is returned, or * ->planningError is set to a description of the failure. */ -MultiPlan * +DistributedPlan * CreateRouterPlan(Query *originalQuery, Query *query, RelationRestrictionContext *restrictionContext) { @@ -183,20 +183,20 @@ CreateRouterPlan(Query *originalQuery, Query *query, * statement. If planning fails ->planningError is set to a description of * the failure. */ -MultiPlan * +DistributedPlan * CreateModifyPlan(Query *originalQuery, Query *query, PlannerRestrictionContext *plannerRestrictionContext) { Job *job = NULL; - MultiPlan *multiPlan = CitusMakeNode(MultiPlan); + DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); bool multiShardQuery = false; - multiPlan->operation = query->commandType; + distributedPlan->operation = query->commandType; - multiPlan->planningError = ModifyQuerySupported(query, multiShardQuery); - if (multiPlan->planningError != NULL) + distributedPlan->planningError = ModifyQuerySupported(query, multiShardQuery); + if (distributedPlan->planningError != NULL) { - return multiPlan; + return distributedPlan; } if (UpdateOrDeleteQuery(query)) @@ -204,31 +204,32 @@ CreateModifyPlan(Query *originalQuery, Query *query, RelationRestrictionContext *restrictionContext = plannerRestrictionContext->relationRestrictionContext; - job = RouterJob(originalQuery, restrictionContext, &multiPlan->planningError); + job = RouterJob(originalQuery, restrictionContext, + &distributedPlan->planningError); } else { - job = RouterInsertJob(originalQuery, query, &multiPlan->planningError); + job = RouterInsertJob(originalQuery, query, &distributedPlan->planningError); } - if (multiPlan->planningError != NULL) + if (distributedPlan->planningError != NULL) { - return multiPlan; + return distributedPlan; } ereport(DEBUG2, (errmsg("Creating router plan"))); - multiPlan->workerJob = job; - multiPlan->masterQuery = NULL; - multiPlan->routerExecutable = true; - multiPlan->hasReturning = false; + distributedPlan->workerJob = job; + distributedPlan->masterQuery = NULL; + distributedPlan->routerExecutable = true; + distributedPlan->hasReturning = false; if (list_length(originalQuery->returningList) > 0) { - multiPlan->hasReturning = true; + distributedPlan->hasReturning = true; } - return multiPlan; + return distributedPlan; } @@ -239,26 +240,26 @@ CreateModifyPlan(Query *originalQuery, Query *query, * are router plannable by default. If query is not router plannable then either NULL is * returned, or the returned plan has planningError set to a description of the problem. */ -static MultiPlan * +static DistributedPlan * CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, RelationRestrictionContext *restrictionContext) { Job *job = NULL; - MultiPlan *multiPlan = CitusMakeNode(MultiPlan); + DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); - multiPlan->operation = query->commandType; + distributedPlan->operation = query->commandType; /* FIXME: this should probably rather be inlined into CreateRouterPlan */ - multiPlan->planningError = ErrorIfQueryHasModifyingCTE(query); - if (multiPlan->planningError) + distributedPlan->planningError = ErrorIfQueryHasModifyingCTE(query); + if (distributedPlan->planningError) { - return multiPlan; + return distributedPlan; } /* we cannot have multi shard update/delete query via this code path */ - job = RouterJob(originalQuery, restrictionContext, &multiPlan->planningError); + job = RouterJob(originalQuery, restrictionContext, &distributedPlan->planningError); - if (multiPlan->planningError) + if (distributedPlan->planningError) { /* query cannot be handled by this planner */ return NULL; @@ -266,12 +267,12 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, ereport(DEBUG2, (errmsg("Creating router plan"))); - multiPlan->workerJob = job; - multiPlan->masterQuery = NULL; - multiPlan->routerExecutable = true; - multiPlan->hasReturning = false; + distributedPlan->workerJob = job; + distributedPlan->masterQuery = NULL; + distributedPlan->routerExecutable = true; + distributedPlan->hasReturning = false; - return multiPlan; + return distributedPlan; } diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index 4fc40b535..69aff11d4 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -10,7 +10,7 @@ */ #include "postgres.h" -#include "distributed/multi_planner.h" +#include "distributed/distributed_planner.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/pg_dist_partition.h" diff --git a/src/backend/distributed/planner/shard_pruning.c b/src/backend/distributed/planner/shard_pruning.c index b05f1674e..6c48d9649 100644 --- a/src/backend/distributed/planner/shard_pruning.c +++ b/src/backend/distributed/planner/shard_pruning.c @@ -54,7 +54,7 @@ #include "catalog/pg_collation.h" #include "catalog/pg_type.h" #include "distributed/metadata_cache.h" -#include "distributed/multi_planner.h" +#include "distributed/distributed_planner.h" #include "distributed/multi_join_order.h" #include "distributed/multi_physical_planner.h" #include "distributed/shardinterval_utils.h" diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 1d2922a25..070a5980e 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -31,7 +31,7 @@ #include "distributed/multi_explain.h" #include "distributed/multi_join_order.h" #include "distributed/multi_logical_optimizer.h" -#include "distributed/multi_planner.h" +#include "distributed/distributed_planner.h" #include "distributed/multi_router_executor.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" @@ -189,7 +189,7 @@ _PG_init(void) RegisterNodes(); /* intercept planner */ - planner_hook = multi_planner; + planner_hook = distributed_planner; /* register utility hook */ #if (PG_VERSION_NUM >= 100000) diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index cd5c334ee..d2ea8c9fb 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -97,9 +97,9 @@ CopyNodeJob(COPYFUNC_ARGS) void -CopyNodeMultiPlan(COPYFUNC_ARGS) +CopyNodeDistributedPlan(COPYFUNC_ARGS) { - DECLARE_FROM_AND_NEW_NODE(MultiPlan); + DECLARE_FROM_AND_NEW_NODE(DistributedPlan); COPY_SCALAR_FIELD(operation); COPY_SCALAR_FIELD(hasReturning); diff --git a/src/backend/distributed/utils/citus_nodefuncs.c b/src/backend/distributed/utils/citus_nodefuncs.c index 9b6491504..cfdd52846 100644 --- a/src/backend/distributed/utils/citus_nodefuncs.c +++ b/src/backend/distributed/utils/citus_nodefuncs.c @@ -15,7 +15,7 @@ #include "distributed/citus_nodefuncs.h" #include "distributed/errormessage.h" #include "distributed/metadata_cache.h" -#include "distributed/multi_planner.h" +#include "distributed/distributed_planner.h" #include "distributed/multi_server_executor.h" static const char *CitusNodeTagNamesD[] = { @@ -31,7 +31,7 @@ static const char *CitusNodeTagNamesD[] = { "MultiExtendedOp", "Job", "MapMergeJob", - "MultiPlan", + "DistributedPlan", "Task", "TaskExecution", "ShardInterval", @@ -383,7 +383,7 @@ EqualUnsupportedCitusNode(const struct ExtensibleNode *a, /* *INDENT-ON* */ const ExtensibleNodeMethods nodeMethods[] = { - DEFINE_NODE_METHODS(MultiPlan), + DEFINE_NODE_METHODS(DistributedPlan), DEFINE_NODE_METHODS(Job), DEFINE_NODE_METHODS(ShardInterval), DEFINE_NODE_METHODS(MapMergeJob), diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 24241ef90..fc875024d 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -25,7 +25,7 @@ #include "distributed/errormessage.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_physical_planner.h" -#include "distributed/multi_planner.h" +#include "distributed/distributed_planner.h" #include "distributed/multi_server_executor.h" #include "distributed/master_metadata_utility.h" #include "lib/stringinfo.h" @@ -173,11 +173,11 @@ OutMultiTreeRoot(OUTFUNC_ARGS) void -OutMultiPlan(OUTFUNC_ARGS) +OutDistributedPlan(OUTFUNC_ARGS) { - WRITE_LOCALS(MultiPlan); + WRITE_LOCALS(DistributedPlan); - WRITE_NODE_TYPE("MULTIPLAN"); + WRITE_NODE_TYPE("DISTRIBUTEDPLAN"); WRITE_INT_FIELD(operation); WRITE_BOOL_FIELD(hasReturning); diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index db6ca6757..f3f9adc5d 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -15,7 +15,7 @@ #include "distributed/citus_nodefuncs.h" #include "distributed/errormessage.h" -#include "distributed/multi_planner.h" +#include "distributed/distributed_planner.h" #include "distributed/multi_server_executor.h" #include "nodes/parsenodes.h" #include "nodes/readfuncs.h" @@ -195,9 +195,9 @@ ReadJob(READFUNC_ARGS) READFUNC_RET -ReadMultiPlan(READFUNC_ARGS) +ReadDistributedPlan(READFUNC_ARGS) { - READ_LOCALS(MultiPlan); + READ_LOCALS(DistributedPlan); READ_INT_FIELD(operation); READ_BOOL_FIELD(hasReturning); diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 9e08d95a1..2c1e72c20 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -22,7 +22,7 @@ #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/multi_partitioning_utils.h" -#include "distributed/multi_planner.h" +#include "distributed/distributed_planner.h" #include "distributed/multi_router_executor.h" #include "distributed/relay_utility.h" #include "distributed/resource_lock.h" diff --git a/src/backend/distributed/utils/shardinterval_utils.c b/src/backend/distributed/utils/shardinterval_utils.c index 4e45b600e..6af2b52d7 100644 --- a/src/backend/distributed/utils/shardinterval_utils.c +++ b/src/backend/distributed/utils/shardinterval_utils.c @@ -16,7 +16,7 @@ #include "catalog/pg_collation.h" #include "catalog/pg_type.h" #include "distributed/metadata_cache.h" -#include "distributed/multi_planner.h" +#include "distributed/distributed_planner.h" #include "distributed/shard_pruning.h" #include "distributed/shardinterval_utils.h" #include "distributed/pg_dist_partition.h" diff --git a/src/include/distributed/citus_nodefuncs.h b/src/include/distributed/citus_nodefuncs.h index 97072fb65..451803e2b 100644 --- a/src/include/distributed/citus_nodefuncs.h +++ b/src/include/distributed/citus_nodefuncs.h @@ -43,7 +43,7 @@ extern void RegisterNodes(void); ExtensibleNode *source_node extern READFUNC_RET ReadJob(READFUNC_ARGS); -extern READFUNC_RET ReadMultiPlan(READFUNC_ARGS); +extern READFUNC_RET ReadDistributedPlan(READFUNC_ARGS); extern READFUNC_RET ReadShardInterval(READFUNC_ARGS); extern READFUNC_RET ReadMapMergeJob(READFUNC_ARGS); extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS); @@ -56,7 +56,7 @@ extern READFUNC_RET ReadGroupShardPlacement(READFUNC_ARGS); extern READFUNC_RET ReadUnsupportedCitusNode(READFUNC_ARGS); extern void OutJob(OUTFUNC_ARGS); -extern void OutMultiPlan(OUTFUNC_ARGS); +extern void OutDistributedPlan(OUTFUNC_ARGS); extern void OutShardInterval(OUTFUNC_ARGS); extern void OutMapMergeJob(OUTFUNC_ARGS); extern void OutShardPlacement(OUTFUNC_ARGS); @@ -78,7 +78,7 @@ extern void OutMultiCartesianProduct(OUTFUNC_ARGS); extern void OutMultiExtendedOp(OUTFUNC_ARGS); extern void CopyNodeJob(COPYFUNC_ARGS); -extern void CopyNodeMultiPlan(COPYFUNC_ARGS); +extern void CopyNodeDistributedPlan(COPYFUNC_ARGS); extern void CopyNodeShardInterval(COPYFUNC_ARGS); extern void CopyNodeMapMergeJob(COPYFUNC_ARGS); extern void CopyNodeShardPlacement(COPYFUNC_ARGS); diff --git a/src/include/distributed/citus_nodes.h b/src/include/distributed/citus_nodes.h index 1497cf626..4e057290a 100644 --- a/src/include/distributed/citus_nodes.h +++ b/src/include/distributed/citus_nodes.h @@ -55,7 +55,7 @@ typedef enum CitusNodeTag T_MultiExtendedOp, T_Job, T_MapMergeJob, - T_MultiPlan, + T_DistributedPlan, T_Task, T_TaskExecution, T_ShardInterval, diff --git a/src/include/distributed/multi_planner.h b/src/include/distributed/distributed_planner.h similarity index 77% rename from src/include/distributed/multi_planner.h rename to src/include/distributed/distributed_planner.h index 71359759f..f97679969 100644 --- a/src/include/distributed/multi_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -1,14 +1,14 @@ /*------------------------------------------------------------------------- * - * multi_planner.h + * distributed_planner.h * General Citus planner code. * * Copyright (c) 2012-2016, Citus Data, Inc. *------------------------------------------------------------------------- */ -#ifndef MULTI_PLANNER_H -#define MULTI_PLANNER_H +#ifndef DISTRIBUTED_PLANNER_H +#define DISTRIBUTED_PLANNER_H #include "nodes/plannodes.h" #include "nodes/relation.h" @@ -71,9 +71,9 @@ typedef struct RelationShard } RelationShard; -extern PlannedStmt * multi_planner(Query *parse, int cursorOptions, - ParamListInfo boundParams); -extern struct MultiPlan * GetMultiPlan(CustomScan *node); +extern PlannedStmt * distributed_planner(Query *parse, int cursorOptions, + ParamListInfo boundParams); +extern struct DistributedPlan * GetDistributedPlan(CustomScan *node); extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, Index index, RangeTblEntry *rte); extern void multi_join_restriction_hook(PlannerInfo *root, @@ -83,13 +83,13 @@ extern void multi_join_restriction_hook(PlannerInfo *root, JoinType jointype, JoinPathExtraData *extra); extern bool IsModifyCommand(Query *query); -extern bool IsUpdateOrDelete(struct MultiPlan *multiPlan); -extern bool IsModifyMultiPlan(struct MultiPlan *multiPlan); -extern bool IsMultiTaskPlan(struct MultiPlan *multiPlan); -extern bool IsMultiShardModifyPlan(struct MultiPlan *multiPlan); +extern bool IsUpdateOrDelete(struct DistributedPlan *distributedPlan); +extern bool IsModifyDistributedPlan(struct DistributedPlan *distributedPlan); +extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan); +extern bool IsMultiShardModifyPlan(struct DistributedPlan *distributedPlan); extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList); extern int GetRTEIdentity(RangeTblEntry *rte); -#endif /* MULTI_PLANNER_H */ +#endif /* DISTRIBUTED_PLANNER_H */ diff --git a/src/include/distributed/insert_select_planner.h b/src/include/distributed/insert_select_planner.h index f1a1362b5..c529e32a7 100644 --- a/src/include/distributed/insert_select_planner.h +++ b/src/include/distributed/insert_select_planner.h @@ -17,7 +17,7 @@ #include "postgres.h" #include "distributed/multi_physical_planner.h" -#include "distributed/multi_planner.h" +#include "distributed/distributed_planner.h" #include "nodes/execnodes.h" #include "nodes/parsenodes.h" #include "nodes/plannodes.h" @@ -30,9 +30,9 @@ extern Query * ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *subqueryRte); extern void CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es); -extern MultiPlan * CreateInsertSelectPlan(Query *originalQuery, - PlannerRestrictionContext * - plannerRestrictionContext); +extern DistributedPlan * CreateInsertSelectPlan(Query *originalQuery, + PlannerRestrictionContext * + plannerRestrictionContext); #endif /* INSERT_SELECT_PLANNER_H */ diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 775e598c2..69af67a81 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -21,7 +21,7 @@ typedef struct CitusScanState { CustomScanState customScanState; /* underlying custom scan node */ - MultiPlan *multiPlan; /* distributed execution plan */ + DistributedPlan *distributedPlan; /* distributed execution plan */ MultiExecutorType executorType; /* distributed executor type */ bool finishedRemoteScan; /* flag to check if remote scan is finished */ Tuplestorestate *tuplestorestate; /* tuple store to store distributed results */ diff --git a/src/include/distributed/multi_master_planner.h b/src/include/distributed/multi_master_planner.h index 54f4e2d95..31362c094 100644 --- a/src/include/distributed/multi_master_planner.h +++ b/src/include/distributed/multi_master_planner.h @@ -20,9 +20,9 @@ /* Function declarations for building local plans on the master node */ -struct MultiPlan; +struct DistributedPlan; struct CustomScan; -extern PlannedStmt * MasterNodeSelectPlan(struct MultiPlan *multiPlan, +extern PlannedStmt * MasterNodeSelectPlan(struct DistributedPlan *distributedPlan, struct CustomScan *dataScan); extern Unique * make_unique_from_sortclauses(Plan *lefttree, List *distinctList); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 7b491ca2d..ee183c3f9 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -22,7 +22,7 @@ #include "distributed/errormessage.h" #include "distributed/master_metadata_utility.h" #include "distributed/multi_logical_planner.h" -#include "distributed/multi_planner.h" +#include "distributed/distributed_planner.h" #include "lib/stringinfo.h" #include "nodes/parsenodes.h" #include "utils/array.h" @@ -217,22 +217,38 @@ typedef struct JoinSequenceNode /* - * MultiPlan + * DistributedPlan contains all information necessary to execute a + * distribute query. */ -typedef struct MultiPlan +typedef struct DistributedPlan { CitusNode type; + + /* type of command to execute (SELECT/INSERT/...) */ CmdType operation; + /* specifies whether a DML command has a RETURNING */ bool hasReturning; + + /* job tree containing the tasks to be executed on workers */ Job *workerJob; + + /* local query that merges results from the workers */ Query *masterQuery; + + /* a router executable query is executed entirely on a worker */ bool routerExecutable; + + /* which relations are accessed by this distributed plan */ List *relationIdList; - /* INSERT ... SELECT via coordinator only */ + /* SELECT query in an INSERT ... SELECT via the coordinator */ Query *insertSelectSubquery; + + /* target list of an INSERT ... SELECT via the coordinator */ List *insertTargetList; + + /* target relation of an INSERT ... SELECT via the coordinator */ Oid targetRelationId; /* @@ -241,7 +257,7 @@ typedef struct MultiPlan * or if prepared statement parameters prevented successful planning. */ DeferredErrorMessage *planningError; -} MultiPlan; +} DistributedPlan; /* OperatorCacheEntry contains information for each element in OperatorCache */ @@ -263,9 +279,9 @@ extern bool EnableUniqueJobIds; /* Function declarations for building physical plans and constructing queries */ -extern MultiPlan * MultiPhysicalPlanCreate(MultiTreeRoot *multiTree, - PlannerRestrictionContext * - plannerRestrictionContext); +extern DistributedPlan * CreatePhysicalDistributedPlan(MultiTreeRoot *multiTree, + PlannerRestrictionContext * + plannerRestrictionContext); extern StringInfo ShardFetchQueryString(uint64 shardId); extern Task * CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType, char *queryString); diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 789051228..153fdcaf0 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -17,7 +17,7 @@ #include "distributed/errormessage.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_physical_planner.h" -#include "distributed/multi_planner.h" +#include "distributed/distributed_planner.h" #include "nodes/parsenodes.h" @@ -26,11 +26,11 @@ extern bool EnableRouterExecution; -extern MultiPlan * CreateRouterPlan(Query *originalQuery, Query *query, - RelationRestrictionContext *restrictionContext); -extern MultiPlan * CreateModifyPlan(Query *originalQuery, Query *query, - PlannerRestrictionContext * - plannerRestrictionContext); +extern DistributedPlan * CreateRouterPlan(Query *originalQuery, Query *query, + RelationRestrictionContext *restrictionContext); +extern DistributedPlan * CreateModifyPlan(Query *originalQuery, Query *query, + PlannerRestrictionContext * + plannerRestrictionContext); extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery, RelationRestrictionContext * restrictionContext, diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index 24df83037..78fa5f3cb 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -195,7 +195,7 @@ extern void MultiRealTimeExecute(Job *job); extern void MultiTaskTrackerExecute(Job *job); /* Function declarations common to more than one executor */ -extern MultiExecutorType JobExecutorType(MultiPlan *multiPlan); +extern MultiExecutorType JobExecutorType(DistributedPlan *distributedPlan); extern void RemoveJobDirectory(uint64 jobId); extern TaskExecution * InitTaskExecution(Task *task, TaskExecStatus initialStatus); extern void CleanupTaskExecution(TaskExecution *taskExecution); diff --git a/src/include/distributed/relation_restriction_equivalence.h b/src/include/distributed/relation_restriction_equivalence.h index f0fcd33e9..ea49af5fe 100644 --- a/src/include/distributed/relation_restriction_equivalence.h +++ b/src/include/distributed/relation_restriction_equivalence.h @@ -12,7 +12,7 @@ #ifndef RELATION_RESTRICTION_EQUIVALENCE_H #define RELATION_RESTRICTION_EQUIVALENCE_H -#include "distributed/multi_planner.h" +#include "distributed/distributed_planner.h" extern bool ContainsUnionSubquery(Query *queryTree);