diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c new file mode 100644 index 000000000..055ac476b --- /dev/null +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -0,0 +1,311 @@ +/*------------------------------------------------------------------------- + * + * citus_custom_scan.c + * + * Definitions of custom scan methods for all executor types. + * + * Copyright (c) 2012-2017, Citus Data, Inc. + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "miscadmin.h" + +#include "commands/copy.h" +#include "distributed/citus_custom_scan.h" +#include "distributed/insert_select_executor.h" +#include "distributed/insert_select_planner.h" +#include "distributed/multi_server_executor.h" +#include "distributed/multi_router_executor.h" +#include "distributed/multi_router_planner.h" +#include "distributed/worker_protocol.h" +#include "executor/executor.h" +#include "nodes/makefuncs.h" +#include "utils/memutils.h" +#include "utils/rel.h" + + +/* functions for creating custom scan nodes */ +static Node * RealTimeCreateScan(CustomScan *scan); +static Node * TaskTrackerCreateScan(CustomScan *scan); +static Node * RouterCreateScan(CustomScan *scan); +static Node * CoordinatorInsertSelectCreateScan(CustomScan *scan); +static Node * DelayedErrorCreateScan(CustomScan *scan); + +/* functions that are common to different scans */ +static void CitusSelectBeginScan(CustomScanState *node, EState *estate, int eflags); +static void CitusEndScan(CustomScanState *node); +static void CitusReScan(CustomScanState *node); + + +/* create custom scan methods for all executors */ +CustomScanMethods RealTimeCustomScanMethods = { + "Citus Real-Time", + RealTimeCreateScan +}; + +CustomScanMethods TaskTrackerCustomScanMethods = { + "Citus Task-Tracker", + TaskTrackerCreateScan +}; + +CustomScanMethods RouterCustomScanMethods = { + "Citus Router", + RouterCreateScan +}; + +CustomScanMethods CoordinatorInsertSelectCustomScanMethods = { + "Citus INSERT ... SELECT via coordinator", + CoordinatorInsertSelectCreateScan +}; + +CustomScanMethods DelayedErrorCustomScanMethods = { + "Citus Delayed Error", + DelayedErrorCreateScan +}; + + +/* + * Define executor methods for the different executor types. + */ +static CustomExecMethods RealTimeCustomExecMethods = { + .CustomName = "RealTimeScan", + .BeginCustomScan = CitusSelectBeginScan, + .ExecCustomScan = RealTimeExecScan, + .EndCustomScan = CitusEndScan, + .ReScanCustomScan = CitusReScan, + .ExplainCustomScan = CitusExplainScan +}; + +static CustomExecMethods TaskTrackerCustomExecMethods = { + .CustomName = "TaskTrackerScan", + .BeginCustomScan = CitusSelectBeginScan, + .ExecCustomScan = TaskTrackerExecScan, + .EndCustomScan = CitusEndScan, + .ReScanCustomScan = CitusReScan, + .ExplainCustomScan = CitusExplainScan +}; + +static CustomExecMethods RouterSequentialModifyCustomExecMethods = { + .CustomName = "RouterSequentialModifyScan", + .BeginCustomScan = CitusModifyBeginScan, + .ExecCustomScan = RouterSequentialModifyExecScan, + .EndCustomScan = CitusEndScan, + .ReScanCustomScan = CitusReScan, + .ExplainCustomScan = CitusExplainScan +}; + +static CustomExecMethods RouterMultiModifyCustomExecMethods = { + .CustomName = "RouterMultiModifyScan", + .BeginCustomScan = CitusModifyBeginScan, + .ExecCustomScan = RouterMultiModifyExecScan, + .EndCustomScan = CitusEndScan, + .ReScanCustomScan = CitusReScan, + .ExplainCustomScan = CitusExplainScan +}; + +static CustomExecMethods RouterSelectCustomExecMethods = { + .CustomName = "RouterSelectScan", + .BeginCustomScan = CitusSelectBeginScan, + .ExecCustomScan = RouterSelectExecScan, + .EndCustomScan = CitusEndScan, + .ReScanCustomScan = CitusReScan, + .ExplainCustomScan = CitusExplainScan +}; + +static CustomExecMethods CoordinatorInsertSelectCustomExecMethods = { + .CustomName = "CoordinatorInsertSelectScan", + .BeginCustomScan = CitusSelectBeginScan, + .ExecCustomScan = CoordinatorInsertSelectExecScan, + .EndCustomScan = CitusEndScan, + .ReScanCustomScan = CitusReScan, + .ExplainCustomScan = CoordinatorInsertSelectExplainScan +}; + + +/* + * Let PostgreSQL know about Citus' custom scan nodes. + */ +void +RegisterCitusCustomScanMethods(void) +{ + RegisterCustomScanMethods(&RealTimeCustomScanMethods); + RegisterCustomScanMethods(&TaskTrackerCustomScanMethods); + RegisterCustomScanMethods(&RouterCustomScanMethods); + RegisterCustomScanMethods(&CoordinatorInsertSelectCustomScanMethods); + RegisterCustomScanMethods(&DelayedErrorCustomScanMethods); +} + + +/* + * RealTimeCreateScan creates the scan state for real-time executor queries. + */ +static Node * +RealTimeCreateScan(CustomScan *scan) +{ + CitusScanState *scanState = palloc0(sizeof(CitusScanState)); + + scanState->executorType = MULTI_EXECUTOR_REAL_TIME; + scanState->customScanState.ss.ps.type = T_CustomScanState; + scanState->distributedPlan = GetDistributedPlan(scan); + + scanState->customScanState.methods = &RealTimeCustomExecMethods; + + return (Node *) scanState; +} + + +/* + * TaskTrackerCreateScan creates the scan state for task-tracker executor queries. + */ +static Node * +TaskTrackerCreateScan(CustomScan *scan) +{ + CitusScanState *scanState = palloc0(sizeof(CitusScanState)); + + scanState->executorType = MULTI_EXECUTOR_TASK_TRACKER; + scanState->customScanState.ss.ps.type = T_CustomScanState; + scanState->distributedPlan = GetDistributedPlan(scan); + + scanState->customScanState.methods = &TaskTrackerCustomExecMethods; + + return (Node *) scanState; +} + + +/* + * RouterCreateScan creates the scan state for router executor queries. + */ +static Node * +RouterCreateScan(CustomScan *scan) +{ + CitusScanState *scanState = palloc0(sizeof(CitusScanState)); + DistributedPlan *distributedPlan = NULL; + Job *workerJob = NULL; + List *taskList = NIL; + bool isModificationQuery = false; + + scanState->executorType = MULTI_EXECUTOR_ROUTER; + scanState->customScanState.ss.ps.type = T_CustomScanState; + scanState->distributedPlan = GetDistributedPlan(scan); + + distributedPlan = scanState->distributedPlan; + workerJob = distributedPlan->workerJob; + taskList = workerJob->taskList; + + isModificationQuery = IsModifyDistributedPlan(distributedPlan); + + /* check whether query has at most one shard */ + if (list_length(taskList) <= 1) + { + if (isModificationQuery) + { + scanState->customScanState.methods = &RouterSequentialModifyCustomExecMethods; + } + else + { + scanState->customScanState.methods = &RouterSelectCustomExecMethods; + } + } + else + { + Assert(isModificationQuery); + + if (IsMultiRowInsert(workerJob->jobQuery) || + (IsUpdateOrDelete(distributedPlan) && + MultiShardConnectionType == SEQUENTIAL_CONNECTION)) + { + /* + * Multi shard update deletes while multi_shard_modify_mode equals + * to 'sequential' or Multi-row INSERT are executed sequentially + * instead of using parallel connections. + */ + scanState->customScanState.methods = &RouterSequentialModifyCustomExecMethods; + } + else + { + scanState->customScanState.methods = &RouterMultiModifyCustomExecMethods; + } + } + + return (Node *) scanState; +} + + +/* + * CoordinatorInsertSelectCrateScan creates the scan state for executing + * INSERT..SELECT into a distributed table via the coordinator. + */ +static Node * +CoordinatorInsertSelectCreateScan(CustomScan *scan) +{ + CitusScanState *scanState = palloc0(sizeof(CitusScanState)); + + scanState->executorType = MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT; + scanState->customScanState.ss.ps.type = T_CustomScanState; + scanState->distributedPlan = GetDistributedPlan(scan); + + scanState->customScanState.methods = &CoordinatorInsertSelectCustomExecMethods; + + return (Node *) scanState; +} + + +/* + * DelayedErrorCreateScan is only called if we could not plan for the given + * query. This is the case when a plan is not ready for execution because + * CreateDistributedPlan() couldn't find a plan due to unresolved prepared + * statement parameters, but didn't error out, because we expect custom plans + * to come to our rescue. But sql (not plpgsql) functions unfortunately don't + * go through a codepath supporting custom plans. Here, we error out with this + * delayed error message. + */ +static Node * +DelayedErrorCreateScan(CustomScan *scan) +{ + DistributedPlan *distributedPlan = GetDistributedPlan(scan); + + /* raise the deferred error */ + RaiseDeferredError(distributedPlan->planningError, ERROR); + + return NULL; +} + + +/* + * CitusSelectBeginScan is an empty function for BeginCustomScan callback. + */ +static void +CitusSelectBeginScan(CustomScanState *node, EState *estate, int eflags) +{ + /* just an empty function */ +} + + +/* + * CitusEndScan is used to clean up tuple store of the given custom scan state. + */ +static void +CitusEndScan(CustomScanState *node) +{ + CitusScanState *scanState = (CitusScanState *) node; + + if (scanState->tuplestorestate) + { + tuplestore_end(scanState->tuplestorestate); + scanState->tuplestorestate = NULL; + } +} + + +/* + * CitusReScan is just a place holder for rescan callback. Currently, we don't + * support rescan given that there is not any way to reach this code path. + */ +static void +CitusReScan(CustomScanState *node) +{ + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("rescan is unsupported"), + errdetail("We don't expect this code path to be executed."))); +} diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 3747be4bf..7216bc5f5 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -15,6 +15,7 @@ #include "access/xact.h" #include "catalog/dependency.h" #include "catalog/namespace.h" +#include "distributed/citus_custom_scan.h" #include "distributed/insert_select_executor.h" #include "distributed/insert_select_planner.h" #include "distributed/multi_copy.h" @@ -41,265 +42,44 @@ /* controls the connection type for multi shard update/delete queries */ int MultiShardConnectionType = PARALLEL_CONNECTION; -/* - * Define executor methods for the different executor types. - */ -static CustomExecMethods RealTimeCustomExecMethods = { - .CustomName = "RealTimeScan", - .BeginCustomScan = CitusSelectBeginScan, - .ExecCustomScan = RealTimeExecScan, - .EndCustomScan = CitusEndScan, - .ReScanCustomScan = CitusReScan, - .ExplainCustomScan = CitusExplainScan -}; -static CustomExecMethods TaskTrackerCustomExecMethods = { - .CustomName = "TaskTrackerScan", - .BeginCustomScan = CitusSelectBeginScan, - .ExecCustomScan = TaskTrackerExecScan, - .EndCustomScan = CitusEndScan, - .ReScanCustomScan = CitusReScan, - .ExplainCustomScan = CitusExplainScan -}; - -static CustomExecMethods RouterSequentialModifyCustomExecMethods = { - .CustomName = "RouterSequentialModifyScan", - .BeginCustomScan = CitusModifyBeginScan, - .ExecCustomScan = RouterSequentialModifyExecScan, - .EndCustomScan = CitusEndScan, - .ReScanCustomScan = CitusReScan, - .ExplainCustomScan = CitusExplainScan -}; - -static CustomExecMethods RouterMultiModifyCustomExecMethods = { - .CustomName = "RouterMultiModifyScan", - .BeginCustomScan = CitusModifyBeginScan, - .ExecCustomScan = RouterMultiModifyExecScan, - .EndCustomScan = CitusEndScan, - .ReScanCustomScan = CitusReScan, - .ExplainCustomScan = CitusExplainScan -}; - -static CustomExecMethods RouterSelectCustomExecMethods = { - .CustomName = "RouterSelectScan", - .BeginCustomScan = CitusSelectBeginScan, - .ExecCustomScan = RouterSelectExecScan, - .EndCustomScan = CitusEndScan, - .ReScanCustomScan = CitusReScan, - .ExplainCustomScan = CitusExplainScan -}; - -static CustomExecMethods CoordinatorInsertSelectCustomExecMethods = { - .CustomName = "CoordinatorInsertSelectScan", - .BeginCustomScan = CitusSelectBeginScan, - .ExecCustomScan = CoordinatorInsertSelectExecScan, - .EndCustomScan = CitusEndScan, - .ReScanCustomScan = CitusReScan, - .ExplainCustomScan = CoordinatorInsertSelectExplainScan -}; - - -/* local function forward declarations */ -static void PrepareMasterJobDirectory(Job *workerJob); -static void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob); +/* ocal function forward declarations */ static Relation StubRelation(TupleDesc tupleDescriptor); /* - * RealTimeCreateScan creates the scan state for real-time executor queries. - */ -Node * -RealTimeCreateScan(CustomScan *scan) -{ - CitusScanState *scanState = palloc0(sizeof(CitusScanState)); - - scanState->executorType = MULTI_EXECUTOR_REAL_TIME; - scanState->customScanState.ss.ps.type = T_CustomScanState; - scanState->distributedPlan = GetDistributedPlan(scan); - - scanState->customScanState.methods = &RealTimeCustomExecMethods; - - return (Node *) scanState; -} - - -/* - * TaskTrackerCreateScan creates the scan state for task-tracker executor queries. - */ -Node * -TaskTrackerCreateScan(CustomScan *scan) -{ - CitusScanState *scanState = palloc0(sizeof(CitusScanState)); - - scanState->executorType = MULTI_EXECUTOR_TASK_TRACKER; - scanState->customScanState.ss.ps.type = T_CustomScanState; - scanState->distributedPlan = GetDistributedPlan(scan); - - scanState->customScanState.methods = &TaskTrackerCustomExecMethods; - - return (Node *) scanState; -} - - -/* - * RouterCreateScan creates the scan state for router executor queries. - */ -Node * -RouterCreateScan(CustomScan *scan) -{ - CitusScanState *scanState = palloc0(sizeof(CitusScanState)); - DistributedPlan *distributedPlan = NULL; - Job *workerJob = NULL; - List *taskList = NIL; - bool isModificationQuery = false; - - scanState->executorType = MULTI_EXECUTOR_ROUTER; - scanState->customScanState.ss.ps.type = T_CustomScanState; - scanState->distributedPlan = GetDistributedPlan(scan); - - distributedPlan = scanState->distributedPlan; - workerJob = distributedPlan->workerJob; - taskList = workerJob->taskList; - - isModificationQuery = IsModifyDistributedPlan(distributedPlan); - - /* check whether query has at most one shard */ - if (list_length(taskList) <= 1) - { - if (isModificationQuery) - { - scanState->customScanState.methods = &RouterSequentialModifyCustomExecMethods; - } - else - { - scanState->customScanState.methods = &RouterSelectCustomExecMethods; - } - } - else - { - Assert(isModificationQuery); - - if (IsMultiRowInsert(workerJob->jobQuery) || - (IsUpdateOrDelete(distributedPlan) && - MultiShardConnectionType == SEQUENTIAL_CONNECTION)) - { - /* - * Multi shard update deletes while multi_shard_modify_mode equals - * to 'sequential' or Multi-row INSERT are executed sequentially - * instead of using parallel connections. - */ - scanState->customScanState.methods = &RouterSequentialModifyCustomExecMethods; - } - else - { - scanState->customScanState.methods = &RouterMultiModifyCustomExecMethods; - } - } - - return (Node *) scanState; -} - - -/* - * CoordinatorInsertSelectCrateScan creates the scan state for executing - * INSERT..SELECT into a distributed table via the coordinator. - */ -Node * -CoordinatorInsertSelectCreateScan(CustomScan *scan) -{ - CitusScanState *scanState = palloc0(sizeof(CitusScanState)); - - scanState->executorType = MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT; - scanState->customScanState.ss.ps.type = T_CustomScanState; - scanState->distributedPlan = GetDistributedPlan(scan); - - scanState->customScanState.methods = &CoordinatorInsertSelectCustomExecMethods; - - return (Node *) scanState; -} - - -/* - * DelayedErrorCreateScan is only called if we could not plan for the given - * query. This is the case when a plan is not ready for execution because - * CreateDistributedPlan() couldn't find a plan due to unresolved prepared - * statement parameters, but didn't error out, because we expect custom plans - * to come to our rescue. But sql (not plpgsql) functions unfortunately don't - * go through a codepath supporting custom plans. Here, we error out with this - * delayed error message. - */ -Node * -DelayedErrorCreateScan(CustomScan *scan) -{ - DistributedPlan *distributedPlan = GetDistributedPlan(scan); - - /* raise the deferred error */ - RaiseDeferredError(distributedPlan->planningError, ERROR); - - return NULL; -} - - -/* - * CitusSelectBeginScan is an empty function for BeginCustomScan callback. - */ -void -CitusSelectBeginScan(CustomScanState *node, EState *estate, int eflags) -{ - /* just an empty function */ -} - - -/* - * RealTimeExecScan is a callback function which returns next tuple from a real-time - * execution. In the first call, it executes distributed real-time plan and loads - * results from temporary files into custom scan's tuple store. Then, it returns - * tuples one by one from this tuple store. + * ReturnTupleFromTuplestore reads the next tuple from the tuple store of the + * given Citus scan node and returns it. It returns null if all tuples are read + * from the tuple store. */ TupleTableSlot * -RealTimeExecScan(CustomScanState *node) +ReturnTupleFromTuplestore(CitusScanState *scanState) { - CitusScanState *scanState = (CitusScanState *) node; + Tuplestorestate *tupleStore = scanState->tuplestorestate; TupleTableSlot *resultSlot = NULL; + ScanDirection scanDirection = NoMovementScanDirection; + bool forwardScanDirection = true; - if (!scanState->finishedRemoteScan) + if (tupleStore == NULL) { - DistributedPlan *distributedPlan = scanState->distributedPlan; - Job *workerJob = distributedPlan->workerJob; - - /* we are taking locks on partitions of partitioned tables */ - LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock); - - PrepareMasterJobDirectory(workerJob); - MultiRealTimeExecute(workerJob); - - LoadTuplesIntoTupleStore(scanState, workerJob); - - scanState->finishedRemoteScan = true; + return NULL; } - resultSlot = ReturnTupleFromTuplestore(scanState); + scanDirection = scanState->customScanState.ss.ps.state->es_direction; + Assert(ScanDirectionIsValid(scanDirection)); + + if (ScanDirectionIsBackward(scanDirection)) + { + forwardScanDirection = false; + } + + resultSlot = scanState->customScanState.ss.ps.ps_ResultTupleSlot; + tuplestore_gettupleslot(tupleStore, forwardScanDirection, false, resultSlot); return resultSlot; } -/* - * PrepareMasterJobDirectory creates a directory on the master node to keep job - * execution results. We also register this directory for automatic cleanup on - * portal delete. - */ -static void -PrepareMasterJobDirectory(Job *workerJob) -{ - StringInfo jobDirectoryName = MasterJobDirectoryName(workerJob->jobId); - CitusCreateDirectory(jobDirectoryName); - - ResourceOwnerEnlargeJobDirectories(CurrentResourceOwner); - ResourceOwnerRememberJobDirectory(CurrentResourceOwner, workerJob->jobId); -} - - /* * Load data collected by real-time or task-tracker executors into the tuplestore * of CitusScanState. For that, we first create a tuple store, and then copy the @@ -308,7 +88,7 @@ PrepareMasterJobDirectory(Job *workerJob) * Note that in the long term it'd be a lot better if Multi*Execute() directly * filled the tuplestores, but that's a fair bit of work. */ -static void +void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob) { CustomScanState customScanState = citusScanState->customScanState; @@ -415,99 +195,3 @@ StubRelation(TupleDesc tupleDescriptor) return stubRelation; } - - -/* - * ReturnTupleFromTuplestore reads the next tuple from the tuple store of the - * given Citus scan node and returns it. It returns null if all tuples are read - * from the tuple store. - */ -TupleTableSlot * -ReturnTupleFromTuplestore(CitusScanState *scanState) -{ - Tuplestorestate *tupleStore = scanState->tuplestorestate; - TupleTableSlot *resultSlot = NULL; - ScanDirection scanDirection = NoMovementScanDirection; - bool forwardScanDirection = true; - - if (tupleStore == NULL) - { - return NULL; - } - - scanDirection = scanState->customScanState.ss.ps.state->es_direction; - Assert(ScanDirectionIsValid(scanDirection)); - - if (ScanDirectionIsBackward(scanDirection)) - { - forwardScanDirection = false; - } - - resultSlot = scanState->customScanState.ss.ps.ps_ResultTupleSlot; - tuplestore_gettupleslot(tupleStore, forwardScanDirection, false, resultSlot); - - return resultSlot; -} - - -/* - * TaskTrackerExecScan is a callback function which returns next tuple from a - * task-tracker execution. In the first call, it executes distributed task-tracker - * plan and loads results from temporary files into custom scan's tuple store. - * Then, it returns tuples one by one from this tuple store. - */ -TupleTableSlot * -TaskTrackerExecScan(CustomScanState *node) -{ - CitusScanState *scanState = (CitusScanState *) node; - TupleTableSlot *resultSlot = NULL; - - if (!scanState->finishedRemoteScan) - { - DistributedPlan *distributedPlan = scanState->distributedPlan; - Job *workerJob = distributedPlan->workerJob; - - /* we are taking locks on partitions of partitioned tables */ - LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock); - - PrepareMasterJobDirectory(workerJob); - MultiTaskTrackerExecute(workerJob); - - LoadTuplesIntoTupleStore(scanState, workerJob); - - scanState->finishedRemoteScan = true; - } - - resultSlot = ReturnTupleFromTuplestore(scanState); - - return resultSlot; -} - - -/* - * CitusEndScan is used to clean up tuple store of the given custom scan state. - */ -void -CitusEndScan(CustomScanState *node) -{ - CitusScanState *scanState = (CitusScanState *) node; - - if (scanState->tuplestorestate) - { - tuplestore_end(scanState->tuplestorestate); - scanState->tuplestorestate = NULL; - } -} - - -/* - * CitusReScan is just a place holder for rescan callback. Currently, we don't - * support rescan given that there is not any way to reach this code path. - */ -void -CitusReScan(CustomScanState *node) -{ - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("rescan is unsupported"), - errdetail("We don't expect this code path to be executed."))); -} diff --git a/src/backend/distributed/executor/multi_real_time_executor.c b/src/backend/distributed/executor/multi_real_time_executor.c index f3f49a5dc..9b7ea9388 100644 --- a/src/backend/distributed/executor/multi_real_time_executor.c +++ b/src/backend/distributed/executor/multi_real_time_executor.c @@ -23,10 +23,14 @@ #include #include "commands/dbcommands.h" +#include "distributed/citus_custom_scan.h" #include "distributed/connection_management.h" #include "distributed/multi_client_executor.h" +#include "distributed/multi_executor.h" #include "distributed/multi_physical_planner.h" +#include "distributed/multi_resowner.h" #include "distributed/multi_server_executor.h" +#include "distributed/resource_lock.h" #include "distributed/worker_protocol.h" #include "distributed/version_compat.h" #include "storage/fd.h" @@ -922,3 +926,53 @@ UpdateConnectionCounter(WorkerNodeState *workerNode, ConnectAction connectAction workerNode->openConnectionCount--; } } + + +/* + * RealTimeExecScan is a callback function which returns next tuple from a real-time + * execution. In the first call, it executes distributed real-time plan and loads + * results from temporary files into custom scan's tuple store. Then, it returns + * tuples one by one from this tuple store. + */ +TupleTableSlot * +RealTimeExecScan(CustomScanState *node) +{ + CitusScanState *scanState = (CitusScanState *) node; + TupleTableSlot *resultSlot = NULL; + + if (!scanState->finishedRemoteScan) + { + DistributedPlan *distributedPlan = scanState->distributedPlan; + Job *workerJob = distributedPlan->workerJob; + + /* we are taking locks on partitions of partitioned tables */ + LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock); + + PrepareMasterJobDirectory(workerJob); + MultiRealTimeExecute(workerJob); + + LoadTuplesIntoTupleStore(scanState, workerJob); + + scanState->finishedRemoteScan = true; + } + + resultSlot = ReturnTupleFromTuplestore(scanState); + + return resultSlot; +} + + +/* + * PrepareMasterJobDirectory creates a directory on the master node to keep job + * execution results. We also register this directory for automatic cleanup on + * portal delete. + */ +void +PrepareMasterJobDirectory(Job *workerJob) +{ + StringInfo jobDirectoryName = MasterJobDirectoryName(workerJob->jobId); + CitusCreateDirectory(jobDirectoryName); + + ResourceOwnerEnlargeJobDirectories(CurrentResourceOwner); + ResourceOwnerRememberJobDirectory(CurrentResourceOwner, workerJob->jobId); +} diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 2f4fd9bfc..8c1843b68 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -25,13 +25,16 @@ #include #include "commands/dbcommands.h" +#include "distributed/citus_custom_scan.h" #include "distributed/citus_nodes.h" #include "distributed/connection_management.h" #include "distributed/metadata_cache.h" #include "distributed/multi_client_executor.h" +#include "distributed/multi_executor.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_server_executor.h" #include "distributed/pg_dist_partition.h" +#include "distributed/resource_lock.h" #include "distributed/worker_protocol.h" #include "distributed/version_compat.h" #include "storage/fd.h" @@ -3004,3 +3007,37 @@ TrackerHashDisconnect(HTAB *taskTrackerHash) taskTracker = (TaskTracker *) hash_seq_search(&status); } } + + +/* + * TaskTrackerExecScan is a callback function which returns next tuple from a + * task-tracker execution. In the first call, it executes distributed task-tracker + * plan and loads results from temporary files into custom scan's tuple store. + * Then, it returns tuples one by one from this tuple store. + */ +TupleTableSlot * +TaskTrackerExecScan(CustomScanState *node) +{ + CitusScanState *scanState = (CitusScanState *) node; + TupleTableSlot *resultSlot = NULL; + + if (!scanState->finishedRemoteScan) + { + DistributedPlan *distributedPlan = scanState->distributedPlan; + Job *workerJob = distributedPlan->workerJob; + + /* we are taking locks on partitions of partitioned tables */ + LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock); + + PrepareMasterJobDirectory(workerJob); + MultiTaskTrackerExecute(workerJob); + + LoadTuplesIntoTupleStore(scanState, workerJob); + + scanState->finishedRemoteScan = true; + } + + resultSlot = ReturnTupleFromTuplestore(scanState); + + return resultSlot; +} diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 43c552684..ae163d0f6 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -39,32 +39,6 @@ static List *plannerRestrictionContextList = NIL; int MultiTaskQueryLogLevel = MULTI_TASK_QUERY_INFO_OFF; /* multi-task query log level */ -/* create custom scan methods for separate executors */ -static CustomScanMethods RealTimeCustomScanMethods = { - "Citus Real-Time", - RealTimeCreateScan -}; - -static CustomScanMethods TaskTrackerCustomScanMethods = { - "Citus Task-Tracker", - TaskTrackerCreateScan -}; - -static CustomScanMethods RouterCustomScanMethods = { - "Citus Router", - RouterCreateScan -}; - -static CustomScanMethods CoordinatorInsertSelectCustomScanMethods = { - "Citus INSERT ... SELECT via coordinator", - CoordinatorInsertSelectCreateScan -}; - -static CustomScanMethods DelayedErrorCustomScanMethods = { - "Citus Delayed Error", - DelayedErrorCreateScan -}; - /* local function forward declarations */ static PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index b34592e6e..588336e0f 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -189,6 +189,9 @@ _PG_init(void) /* make our additional node types known */ RegisterNodes(); + /* make our custom scan nodes known */ + RegisterCitusCustomScanMethods(); + /* intercept planner */ planner_hook = distributed_planner; diff --git a/src/include/distributed/citus_custom_scan.h b/src/include/distributed/citus_custom_scan.h new file mode 100644 index 000000000..2ef734e63 --- /dev/null +++ b/src/include/distributed/citus_custom_scan.h @@ -0,0 +1,41 @@ +/*------------------------------------------------------------------------- + * + * citus_custom_scan.h + * Export all custom scan and custom exec methods. + * + * Copyright (c) 2012-2017, Citus Data, Inc. + *------------------------------------------------------------------------- + */ + +#ifndef CITUS_CUSTOM_SCAN_H +#define CITUS_CUSTOM_SCAN_H + +#include "distributed/distributed_planner.h" +#include "distributed/multi_server_executor.h" +#include "executor/execdesc.h" +#include "nodes/plannodes.h" + + +typedef struct CitusScanState +{ + CustomScanState customScanState; /* underlying custom scan node */ + DistributedPlan *distributedPlan; /* distributed execution plan */ + MultiExecutorType executorType; /* distributed executor type */ + bool finishedRemoteScan; /* flag to check if remote scan is finished */ + Tuplestorestate *tuplestorestate; /* tuple store to store distributed results */ +} CitusScanState; + + +/* custom scan methods for all executors */ +extern CustomScanMethods RealTimeCustomScanMethods; +extern CustomScanMethods TaskTrackerCustomScanMethods; +extern CustomScanMethods RouterCustomScanMethods; +extern CustomScanMethods CoordinatorInsertSelectCustomScanMethods; +extern CustomScanMethods DelayedErrorCustomScanMethods; + + +extern void RegisterCitusCustomScanMethods(void); +extern void CitusExplainScan(CustomScanState *node, List *ancestors, struct + ExplainState *es); + +#endif /* CITUS_CUSTOM_SCAN_H */ diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 69af67a81..c2fbff914 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -14,20 +14,11 @@ #include "nodes/parsenodes.h" #include "nodes/execnodes.h" +#include "distributed/citus_custom_scan.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_server_executor.h" -typedef struct CitusScanState -{ - CustomScanState customScanState; /* underlying custom scan node */ - DistributedPlan *distributedPlan; /* distributed execution plan */ - MultiExecutorType executorType; /* distributed executor type */ - bool finishedRemoteScan; /* flag to check if remote scan is finished */ - Tuplestorestate *tuplestorestate; /* tuple store to store distributed results */ -} CitusScanState; - - /* managed via guc.c */ typedef enum { @@ -36,19 +27,9 @@ typedef enum } MultiShardConnectionTypes; extern int MultiShardConnectionType; -extern Node * RealTimeCreateScan(CustomScan *scan); -extern Node * TaskTrackerCreateScan(CustomScan *scan); -extern Node * RouterCreateScan(CustomScan *scan); -extern Node * CoordinatorInsertSelectCreateScan(CustomScan *scan); -extern Node * DelayedErrorCreateScan(CustomScan *scan); -extern void CitusSelectBeginScan(CustomScanState *node, EState *estate, int eflags); -extern TupleTableSlot * RealTimeExecScan(CustomScanState *node); -extern TupleTableSlot * TaskTrackerExecScan(CustomScanState *node); -extern void CitusEndScan(CustomScanState *node); -extern void CitusReScan(CustomScanState *node); -extern void CitusExplainScan(CustomScanState *node, List *ancestors, struct - ExplainState *es); + extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState); +extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob); #endif /* MULTI_EXECUTOR_H */ diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index 78fa5f3cb..cab5b35a1 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -202,6 +202,10 @@ extern void CleanupTaskExecution(TaskExecution *taskExecution); extern bool TaskExecutionFailed(TaskExecution *taskExecution); extern void AdjustStateForFailure(TaskExecution *taskExecution); extern int MaxMasterConnectionCount(void); +extern void PrepareMasterJobDirectory(Job *workerJob); +extern TupleTableSlot * RealTimeExecScan(CustomScanState *node); +extern TupleTableSlot * TaskTrackerExecScan(CustomScanState *node); + #endif /* MULTI_SERVER_EXECUTOR_H */