Merge pull request #1825 from citusdata/register_custom_scans

Register custom scans
pull/1823/head
Önder Kalacı 2017-11-23 13:00:58 +03:00 committed by GitHub
commit 309ba9f0d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 475 additions and 386 deletions

View File

@ -0,0 +1,311 @@
/*-------------------------------------------------------------------------
*
* citus_custom_scan.c
*
* Definitions of custom scan methods for all executor types.
*
* Copyright (c) 2012-2017, Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "commands/copy.h"
#include "distributed/citus_custom_scan.h"
#include "distributed/insert_select_executor.h"
#include "distributed/insert_select_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/multi_router_executor.h"
#include "distributed/multi_router_planner.h"
#include "distributed/worker_protocol.h"
#include "executor/executor.h"
#include "nodes/makefuncs.h"
#include "utils/memutils.h"
#include "utils/rel.h"
/* functions for creating custom scan nodes */
static Node * RealTimeCreateScan(CustomScan *scan);
static Node * TaskTrackerCreateScan(CustomScan *scan);
static Node * RouterCreateScan(CustomScan *scan);
static Node * CoordinatorInsertSelectCreateScan(CustomScan *scan);
static Node * DelayedErrorCreateScan(CustomScan *scan);
/* functions that are common to different scans */
static void CitusSelectBeginScan(CustomScanState *node, EState *estate, int eflags);
static void CitusEndScan(CustomScanState *node);
static void CitusReScan(CustomScanState *node);
/* create custom scan methods for all executors */
CustomScanMethods RealTimeCustomScanMethods = {
"Citus Real-Time",
RealTimeCreateScan
};
CustomScanMethods TaskTrackerCustomScanMethods = {
"Citus Task-Tracker",
TaskTrackerCreateScan
};
CustomScanMethods RouterCustomScanMethods = {
"Citus Router",
RouterCreateScan
};
CustomScanMethods CoordinatorInsertSelectCustomScanMethods = {
"Citus INSERT ... SELECT via coordinator",
CoordinatorInsertSelectCreateScan
};
CustomScanMethods DelayedErrorCustomScanMethods = {
"Citus Delayed Error",
DelayedErrorCreateScan
};
/*
* Define executor methods for the different executor types.
*/
static CustomExecMethods RealTimeCustomExecMethods = {
.CustomName = "RealTimeScan",
.BeginCustomScan = CitusSelectBeginScan,
.ExecCustomScan = RealTimeExecScan,
.EndCustomScan = CitusEndScan,
.ReScanCustomScan = CitusReScan,
.ExplainCustomScan = CitusExplainScan
};
static CustomExecMethods TaskTrackerCustomExecMethods = {
.CustomName = "TaskTrackerScan",
.BeginCustomScan = CitusSelectBeginScan,
.ExecCustomScan = TaskTrackerExecScan,
.EndCustomScan = CitusEndScan,
.ReScanCustomScan = CitusReScan,
.ExplainCustomScan = CitusExplainScan
};
static CustomExecMethods RouterSequentialModifyCustomExecMethods = {
.CustomName = "RouterSequentialModifyScan",
.BeginCustomScan = CitusModifyBeginScan,
.ExecCustomScan = RouterSequentialModifyExecScan,
.EndCustomScan = CitusEndScan,
.ReScanCustomScan = CitusReScan,
.ExplainCustomScan = CitusExplainScan
};
static CustomExecMethods RouterMultiModifyCustomExecMethods = {
.CustomName = "RouterMultiModifyScan",
.BeginCustomScan = CitusModifyBeginScan,
.ExecCustomScan = RouterMultiModifyExecScan,
.EndCustomScan = CitusEndScan,
.ReScanCustomScan = CitusReScan,
.ExplainCustomScan = CitusExplainScan
};
static CustomExecMethods RouterSelectCustomExecMethods = {
.CustomName = "RouterSelectScan",
.BeginCustomScan = CitusSelectBeginScan,
.ExecCustomScan = RouterSelectExecScan,
.EndCustomScan = CitusEndScan,
.ReScanCustomScan = CitusReScan,
.ExplainCustomScan = CitusExplainScan
};
static CustomExecMethods CoordinatorInsertSelectCustomExecMethods = {
.CustomName = "CoordinatorInsertSelectScan",
.BeginCustomScan = CitusSelectBeginScan,
.ExecCustomScan = CoordinatorInsertSelectExecScan,
.EndCustomScan = CitusEndScan,
.ReScanCustomScan = CitusReScan,
.ExplainCustomScan = CoordinatorInsertSelectExplainScan
};
/*
* Let PostgreSQL know about Citus' custom scan nodes.
*/
void
RegisterCitusCustomScanMethods(void)
{
RegisterCustomScanMethods(&RealTimeCustomScanMethods);
RegisterCustomScanMethods(&TaskTrackerCustomScanMethods);
RegisterCustomScanMethods(&RouterCustomScanMethods);
RegisterCustomScanMethods(&CoordinatorInsertSelectCustomScanMethods);
RegisterCustomScanMethods(&DelayedErrorCustomScanMethods);
}
/*
* RealTimeCreateScan creates the scan state for real-time executor queries.
*/
static Node *
RealTimeCreateScan(CustomScan *scan)
{
CitusScanState *scanState = palloc0(sizeof(CitusScanState));
scanState->executorType = MULTI_EXECUTOR_REAL_TIME;
scanState->customScanState.ss.ps.type = T_CustomScanState;
scanState->distributedPlan = GetDistributedPlan(scan);
scanState->customScanState.methods = &RealTimeCustomExecMethods;
return (Node *) scanState;
}
/*
* TaskTrackerCreateScan creates the scan state for task-tracker executor queries.
*/
static Node *
TaskTrackerCreateScan(CustomScan *scan)
{
CitusScanState *scanState = palloc0(sizeof(CitusScanState));
scanState->executorType = MULTI_EXECUTOR_TASK_TRACKER;
scanState->customScanState.ss.ps.type = T_CustomScanState;
scanState->distributedPlan = GetDistributedPlan(scan);
scanState->customScanState.methods = &TaskTrackerCustomExecMethods;
return (Node *) scanState;
}
/*
* RouterCreateScan creates the scan state for router executor queries.
*/
static Node *
RouterCreateScan(CustomScan *scan)
{
CitusScanState *scanState = palloc0(sizeof(CitusScanState));
DistributedPlan *distributedPlan = NULL;
Job *workerJob = NULL;
List *taskList = NIL;
bool isModificationQuery = false;
scanState->executorType = MULTI_EXECUTOR_ROUTER;
scanState->customScanState.ss.ps.type = T_CustomScanState;
scanState->distributedPlan = GetDistributedPlan(scan);
distributedPlan = scanState->distributedPlan;
workerJob = distributedPlan->workerJob;
taskList = workerJob->taskList;
isModificationQuery = IsModifyDistributedPlan(distributedPlan);
/* check whether query has at most one shard */
if (list_length(taskList) <= 1)
{
if (isModificationQuery)
{
scanState->customScanState.methods = &RouterSequentialModifyCustomExecMethods;
}
else
{
scanState->customScanState.methods = &RouterSelectCustomExecMethods;
}
}
else
{
Assert(isModificationQuery);
if (IsMultiRowInsert(workerJob->jobQuery) ||
(IsUpdateOrDelete(distributedPlan) &&
MultiShardConnectionType == SEQUENTIAL_CONNECTION))
{
/*
* Multi shard update deletes while multi_shard_modify_mode equals
* to 'sequential' or Multi-row INSERT are executed sequentially
* instead of using parallel connections.
*/
scanState->customScanState.methods = &RouterSequentialModifyCustomExecMethods;
}
else
{
scanState->customScanState.methods = &RouterMultiModifyCustomExecMethods;
}
}
return (Node *) scanState;
}
/*
* CoordinatorInsertSelectCrateScan creates the scan state for executing
* INSERT..SELECT into a distributed table via the coordinator.
*/
static Node *
CoordinatorInsertSelectCreateScan(CustomScan *scan)
{
CitusScanState *scanState = palloc0(sizeof(CitusScanState));
scanState->executorType = MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT;
scanState->customScanState.ss.ps.type = T_CustomScanState;
scanState->distributedPlan = GetDistributedPlan(scan);
scanState->customScanState.methods = &CoordinatorInsertSelectCustomExecMethods;
return (Node *) scanState;
}
/*
* DelayedErrorCreateScan is only called if we could not plan for the given
* query. This is the case when a plan is not ready for execution because
* CreateDistributedPlan() couldn't find a plan due to unresolved prepared
* statement parameters, but didn't error out, because we expect custom plans
* to come to our rescue. But sql (not plpgsql) functions unfortunately don't
* go through a codepath supporting custom plans. Here, we error out with this
* delayed error message.
*/
static Node *
DelayedErrorCreateScan(CustomScan *scan)
{
DistributedPlan *distributedPlan = GetDistributedPlan(scan);
/* raise the deferred error */
RaiseDeferredError(distributedPlan->planningError, ERROR);
return NULL;
}
/*
* CitusSelectBeginScan is an empty function for BeginCustomScan callback.
*/
static void
CitusSelectBeginScan(CustomScanState *node, EState *estate, int eflags)
{
/* just an empty function */
}
/*
* CitusEndScan is used to clean up tuple store of the given custom scan state.
*/
static void
CitusEndScan(CustomScanState *node)
{
CitusScanState *scanState = (CitusScanState *) node;
if (scanState->tuplestorestate)
{
tuplestore_end(scanState->tuplestorestate);
scanState->tuplestorestate = NULL;
}
}
/*
* CitusReScan is just a place holder for rescan callback. Currently, we don't
* support rescan given that there is not any way to reach this code path.
*/
static void
CitusReScan(CustomScanState *node)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("rescan is unsupported"),
errdetail("We don't expect this code path to be executed.")));
}

View File

@ -15,6 +15,7 @@
#include "access/xact.h"
#include "catalog/dependency.h"
#include "catalog/namespace.h"
#include "distributed/citus_custom_scan.h"
#include "distributed/insert_select_executor.h"
#include "distributed/insert_select_planner.h"
#include "distributed/multi_copy.h"
@ -41,265 +42,44 @@
/* controls the connection type for multi shard update/delete queries */
int MultiShardConnectionType = PARALLEL_CONNECTION;
/*
* Define executor methods for the different executor types.
*/
static CustomExecMethods RealTimeCustomExecMethods = {
.CustomName = "RealTimeScan",
.BeginCustomScan = CitusSelectBeginScan,
.ExecCustomScan = RealTimeExecScan,
.EndCustomScan = CitusEndScan,
.ReScanCustomScan = CitusReScan,
.ExplainCustomScan = CitusExplainScan
};
static CustomExecMethods TaskTrackerCustomExecMethods = {
.CustomName = "TaskTrackerScan",
.BeginCustomScan = CitusSelectBeginScan,
.ExecCustomScan = TaskTrackerExecScan,
.EndCustomScan = CitusEndScan,
.ReScanCustomScan = CitusReScan,
.ExplainCustomScan = CitusExplainScan
};
static CustomExecMethods RouterSequentialModifyCustomExecMethods = {
.CustomName = "RouterSequentialModifyScan",
.BeginCustomScan = CitusModifyBeginScan,
.ExecCustomScan = RouterSequentialModifyExecScan,
.EndCustomScan = CitusEndScan,
.ReScanCustomScan = CitusReScan,
.ExplainCustomScan = CitusExplainScan
};
static CustomExecMethods RouterMultiModifyCustomExecMethods = {
.CustomName = "RouterMultiModifyScan",
.BeginCustomScan = CitusModifyBeginScan,
.ExecCustomScan = RouterMultiModifyExecScan,
.EndCustomScan = CitusEndScan,
.ReScanCustomScan = CitusReScan,
.ExplainCustomScan = CitusExplainScan
};
static CustomExecMethods RouterSelectCustomExecMethods = {
.CustomName = "RouterSelectScan",
.BeginCustomScan = CitusSelectBeginScan,
.ExecCustomScan = RouterSelectExecScan,
.EndCustomScan = CitusEndScan,
.ReScanCustomScan = CitusReScan,
.ExplainCustomScan = CitusExplainScan
};
static CustomExecMethods CoordinatorInsertSelectCustomExecMethods = {
.CustomName = "CoordinatorInsertSelectScan",
.BeginCustomScan = CitusSelectBeginScan,
.ExecCustomScan = CoordinatorInsertSelectExecScan,
.EndCustomScan = CitusEndScan,
.ReScanCustomScan = CitusReScan,
.ExplainCustomScan = CoordinatorInsertSelectExplainScan
};
/* local function forward declarations */
static void PrepareMasterJobDirectory(Job *workerJob);
static void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob);
/* ocal function forward declarations */
static Relation StubRelation(TupleDesc tupleDescriptor);
/*
* RealTimeCreateScan creates the scan state for real-time executor queries.
*/
Node *
RealTimeCreateScan(CustomScan *scan)
{
CitusScanState *scanState = palloc0(sizeof(CitusScanState));
scanState->executorType = MULTI_EXECUTOR_REAL_TIME;
scanState->customScanState.ss.ps.type = T_CustomScanState;
scanState->distributedPlan = GetDistributedPlan(scan);
scanState->customScanState.methods = &RealTimeCustomExecMethods;
return (Node *) scanState;
}
/*
* TaskTrackerCreateScan creates the scan state for task-tracker executor queries.
*/
Node *
TaskTrackerCreateScan(CustomScan *scan)
{
CitusScanState *scanState = palloc0(sizeof(CitusScanState));
scanState->executorType = MULTI_EXECUTOR_TASK_TRACKER;
scanState->customScanState.ss.ps.type = T_CustomScanState;
scanState->distributedPlan = GetDistributedPlan(scan);
scanState->customScanState.methods = &TaskTrackerCustomExecMethods;
return (Node *) scanState;
}
/*
* RouterCreateScan creates the scan state for router executor queries.
*/
Node *
RouterCreateScan(CustomScan *scan)
{
CitusScanState *scanState = palloc0(sizeof(CitusScanState));
DistributedPlan *distributedPlan = NULL;
Job *workerJob = NULL;
List *taskList = NIL;
bool isModificationQuery = false;
scanState->executorType = MULTI_EXECUTOR_ROUTER;
scanState->customScanState.ss.ps.type = T_CustomScanState;
scanState->distributedPlan = GetDistributedPlan(scan);
distributedPlan = scanState->distributedPlan;
workerJob = distributedPlan->workerJob;
taskList = workerJob->taskList;
isModificationQuery = IsModifyDistributedPlan(distributedPlan);
/* check whether query has at most one shard */
if (list_length(taskList) <= 1)
{
if (isModificationQuery)
{
scanState->customScanState.methods = &RouterSequentialModifyCustomExecMethods;
}
else
{
scanState->customScanState.methods = &RouterSelectCustomExecMethods;
}
}
else
{
Assert(isModificationQuery);
if (IsMultiRowInsert(workerJob->jobQuery) ||
(IsUpdateOrDelete(distributedPlan) &&
MultiShardConnectionType == SEQUENTIAL_CONNECTION))
{
/*
* Multi shard update deletes while multi_shard_modify_mode equals
* to 'sequential' or Multi-row INSERT are executed sequentially
* instead of using parallel connections.
*/
scanState->customScanState.methods = &RouterSequentialModifyCustomExecMethods;
}
else
{
scanState->customScanState.methods = &RouterMultiModifyCustomExecMethods;
}
}
return (Node *) scanState;
}
/*
* CoordinatorInsertSelectCrateScan creates the scan state for executing
* INSERT..SELECT into a distributed table via the coordinator.
*/
Node *
CoordinatorInsertSelectCreateScan(CustomScan *scan)
{
CitusScanState *scanState = palloc0(sizeof(CitusScanState));
scanState->executorType = MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT;
scanState->customScanState.ss.ps.type = T_CustomScanState;
scanState->distributedPlan = GetDistributedPlan(scan);
scanState->customScanState.methods = &CoordinatorInsertSelectCustomExecMethods;
return (Node *) scanState;
}
/*
* DelayedErrorCreateScan is only called if we could not plan for the given
* query. This is the case when a plan is not ready for execution because
* CreateDistributedPlan() couldn't find a plan due to unresolved prepared
* statement parameters, but didn't error out, because we expect custom plans
* to come to our rescue. But sql (not plpgsql) functions unfortunately don't
* go through a codepath supporting custom plans. Here, we error out with this
* delayed error message.
*/
Node *
DelayedErrorCreateScan(CustomScan *scan)
{
DistributedPlan *distributedPlan = GetDistributedPlan(scan);
/* raise the deferred error */
RaiseDeferredError(distributedPlan->planningError, ERROR);
return NULL;
}
/*
* CitusSelectBeginScan is an empty function for BeginCustomScan callback.
*/
void
CitusSelectBeginScan(CustomScanState *node, EState *estate, int eflags)
{
/* just an empty function */
}
/*
* RealTimeExecScan is a callback function which returns next tuple from a real-time
* execution. In the first call, it executes distributed real-time plan and loads
* results from temporary files into custom scan's tuple store. Then, it returns
* tuples one by one from this tuple store.
* ReturnTupleFromTuplestore reads the next tuple from the tuple store of the
* given Citus scan node and returns it. It returns null if all tuples are read
* from the tuple store.
*/
TupleTableSlot *
RealTimeExecScan(CustomScanState *node)
ReturnTupleFromTuplestore(CitusScanState *scanState)
{
CitusScanState *scanState = (CitusScanState *) node;
Tuplestorestate *tupleStore = scanState->tuplestorestate;
TupleTableSlot *resultSlot = NULL;
ScanDirection scanDirection = NoMovementScanDirection;
bool forwardScanDirection = true;
if (!scanState->finishedRemoteScan)
if (tupleStore == NULL)
{
DistributedPlan *distributedPlan = scanState->distributedPlan;
Job *workerJob = distributedPlan->workerJob;
/* we are taking locks on partitions of partitioned tables */
LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock);
PrepareMasterJobDirectory(workerJob);
MultiRealTimeExecute(workerJob);
LoadTuplesIntoTupleStore(scanState, workerJob);
scanState->finishedRemoteScan = true;
return NULL;
}
resultSlot = ReturnTupleFromTuplestore(scanState);
scanDirection = scanState->customScanState.ss.ps.state->es_direction;
Assert(ScanDirectionIsValid(scanDirection));
if (ScanDirectionIsBackward(scanDirection))
{
forwardScanDirection = false;
}
resultSlot = scanState->customScanState.ss.ps.ps_ResultTupleSlot;
tuplestore_gettupleslot(tupleStore, forwardScanDirection, false, resultSlot);
return resultSlot;
}
/*
* PrepareMasterJobDirectory creates a directory on the master node to keep job
* execution results. We also register this directory for automatic cleanup on
* portal delete.
*/
static void
PrepareMasterJobDirectory(Job *workerJob)
{
StringInfo jobDirectoryName = MasterJobDirectoryName(workerJob->jobId);
CitusCreateDirectory(jobDirectoryName);
ResourceOwnerEnlargeJobDirectories(CurrentResourceOwner);
ResourceOwnerRememberJobDirectory(CurrentResourceOwner, workerJob->jobId);
}
/*
* Load data collected by real-time or task-tracker executors into the tuplestore
* of CitusScanState. For that, we first create a tuple store, and then copy the
@ -308,7 +88,7 @@ PrepareMasterJobDirectory(Job *workerJob)
* Note that in the long term it'd be a lot better if Multi*Execute() directly
* filled the tuplestores, but that's a fair bit of work.
*/
static void
void
LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob)
{
CustomScanState customScanState = citusScanState->customScanState;
@ -415,99 +195,3 @@ StubRelation(TupleDesc tupleDescriptor)
return stubRelation;
}
/*
* ReturnTupleFromTuplestore reads the next tuple from the tuple store of the
* given Citus scan node and returns it. It returns null if all tuples are read
* from the tuple store.
*/
TupleTableSlot *
ReturnTupleFromTuplestore(CitusScanState *scanState)
{
Tuplestorestate *tupleStore = scanState->tuplestorestate;
TupleTableSlot *resultSlot = NULL;
ScanDirection scanDirection = NoMovementScanDirection;
bool forwardScanDirection = true;
if (tupleStore == NULL)
{
return NULL;
}
scanDirection = scanState->customScanState.ss.ps.state->es_direction;
Assert(ScanDirectionIsValid(scanDirection));
if (ScanDirectionIsBackward(scanDirection))
{
forwardScanDirection = false;
}
resultSlot = scanState->customScanState.ss.ps.ps_ResultTupleSlot;
tuplestore_gettupleslot(tupleStore, forwardScanDirection, false, resultSlot);
return resultSlot;
}
/*
* TaskTrackerExecScan is a callback function which returns next tuple from a
* task-tracker execution. In the first call, it executes distributed task-tracker
* plan and loads results from temporary files into custom scan's tuple store.
* Then, it returns tuples one by one from this tuple store.
*/
TupleTableSlot *
TaskTrackerExecScan(CustomScanState *node)
{
CitusScanState *scanState = (CitusScanState *) node;
TupleTableSlot *resultSlot = NULL;
if (!scanState->finishedRemoteScan)
{
DistributedPlan *distributedPlan = scanState->distributedPlan;
Job *workerJob = distributedPlan->workerJob;
/* we are taking locks on partitions of partitioned tables */
LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock);
PrepareMasterJobDirectory(workerJob);
MultiTaskTrackerExecute(workerJob);
LoadTuplesIntoTupleStore(scanState, workerJob);
scanState->finishedRemoteScan = true;
}
resultSlot = ReturnTupleFromTuplestore(scanState);
return resultSlot;
}
/*
* CitusEndScan is used to clean up tuple store of the given custom scan state.
*/
void
CitusEndScan(CustomScanState *node)
{
CitusScanState *scanState = (CitusScanState *) node;
if (scanState->tuplestorestate)
{
tuplestore_end(scanState->tuplestorestate);
scanState->tuplestorestate = NULL;
}
}
/*
* CitusReScan is just a place holder for rescan callback. Currently, we don't
* support rescan given that there is not any way to reach this code path.
*/
void
CitusReScan(CustomScanState *node)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("rescan is unsupported"),
errdetail("We don't expect this code path to be executed.")));
}

View File

@ -23,10 +23,14 @@
#include <unistd.h>
#include "commands/dbcommands.h"
#include "distributed/citus_custom_scan.h"
#include "distributed/connection_management.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_resowner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/resource_lock.h"
#include "distributed/worker_protocol.h"
#include "distributed/version_compat.h"
#include "storage/fd.h"
@ -922,3 +926,53 @@ UpdateConnectionCounter(WorkerNodeState *workerNode, ConnectAction connectAction
workerNode->openConnectionCount--;
}
}
/*
* RealTimeExecScan is a callback function which returns next tuple from a real-time
* execution. In the first call, it executes distributed real-time plan and loads
* results from temporary files into custom scan's tuple store. Then, it returns
* tuples one by one from this tuple store.
*/
TupleTableSlot *
RealTimeExecScan(CustomScanState *node)
{
CitusScanState *scanState = (CitusScanState *) node;
TupleTableSlot *resultSlot = NULL;
if (!scanState->finishedRemoteScan)
{
DistributedPlan *distributedPlan = scanState->distributedPlan;
Job *workerJob = distributedPlan->workerJob;
/* we are taking locks on partitions of partitioned tables */
LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock);
PrepareMasterJobDirectory(workerJob);
MultiRealTimeExecute(workerJob);
LoadTuplesIntoTupleStore(scanState, workerJob);
scanState->finishedRemoteScan = true;
}
resultSlot = ReturnTupleFromTuplestore(scanState);
return resultSlot;
}
/*
* PrepareMasterJobDirectory creates a directory on the master node to keep job
* execution results. We also register this directory for automatic cleanup on
* portal delete.
*/
void
PrepareMasterJobDirectory(Job *workerJob)
{
StringInfo jobDirectoryName = MasterJobDirectoryName(workerJob->jobId);
CitusCreateDirectory(jobDirectoryName);
ResourceOwnerEnlargeJobDirectories(CurrentResourceOwner);
ResourceOwnerRememberJobDirectory(CurrentResourceOwner, workerJob->jobId);
}

View File

@ -25,13 +25,16 @@
#include <math.h>
#include "commands/dbcommands.h"
#include "distributed/citus_custom_scan.h"
#include "distributed/citus_nodes.h"
#include "distributed/connection_management.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/resource_lock.h"
#include "distributed/worker_protocol.h"
#include "distributed/version_compat.h"
#include "storage/fd.h"
@ -3004,3 +3007,37 @@ TrackerHashDisconnect(HTAB *taskTrackerHash)
taskTracker = (TaskTracker *) hash_seq_search(&status);
}
}
/*
* TaskTrackerExecScan is a callback function which returns next tuple from a
* task-tracker execution. In the first call, it executes distributed task-tracker
* plan and loads results from temporary files into custom scan's tuple store.
* Then, it returns tuples one by one from this tuple store.
*/
TupleTableSlot *
TaskTrackerExecScan(CustomScanState *node)
{
CitusScanState *scanState = (CitusScanState *) node;
TupleTableSlot *resultSlot = NULL;
if (!scanState->finishedRemoteScan)
{
DistributedPlan *distributedPlan = scanState->distributedPlan;
Job *workerJob = distributedPlan->workerJob;
/* we are taking locks on partitions of partitioned tables */
LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock);
PrepareMasterJobDirectory(workerJob);
MultiTaskTrackerExecute(workerJob);
LoadTuplesIntoTupleStore(scanState, workerJob);
scanState->finishedRemoteScan = true;
}
resultSlot = ReturnTupleFromTuplestore(scanState);
return resultSlot;
}

View File

@ -39,32 +39,6 @@
static List *plannerRestrictionContextList = NIL;
int MultiTaskQueryLogLevel = MULTI_TASK_QUERY_INFO_OFF; /* multi-task query log level */
/* create custom scan methods for separate executors */
static CustomScanMethods RealTimeCustomScanMethods = {
"Citus Real-Time",
RealTimeCreateScan
};
static CustomScanMethods TaskTrackerCustomScanMethods = {
"Citus Task-Tracker",
TaskTrackerCreateScan
};
static CustomScanMethods RouterCustomScanMethods = {
"Citus Router",
RouterCreateScan
};
static CustomScanMethods CoordinatorInsertSelectCustomScanMethods = {
"Citus INSERT ... SELECT via coordinator",
CoordinatorInsertSelectCreateScan
};
static CustomScanMethods DelayedErrorCustomScanMethods = {
"Citus Delayed Error",
DelayedErrorCreateScan
};
/* local function forward declarations */
static PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery,

View File

@ -189,6 +189,9 @@ _PG_init(void)
/* make our additional node types known */
RegisterNodes();
/* make our custom scan nodes known */
RegisterCitusCustomScanMethods();
/* intercept planner */
planner_hook = distributed_planner;

View File

@ -0,0 +1,41 @@
/*-------------------------------------------------------------------------
*
* citus_custom_scan.h
* Export all custom scan and custom exec methods.
*
* Copyright (c) 2012-2017, Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#ifndef CITUS_CUSTOM_SCAN_H
#define CITUS_CUSTOM_SCAN_H
#include "distributed/distributed_planner.h"
#include "distributed/multi_server_executor.h"
#include "executor/execdesc.h"
#include "nodes/plannodes.h"
typedef struct CitusScanState
{
CustomScanState customScanState; /* underlying custom scan node */
DistributedPlan *distributedPlan; /* distributed execution plan */
MultiExecutorType executorType; /* distributed executor type */
bool finishedRemoteScan; /* flag to check if remote scan is finished */
Tuplestorestate *tuplestorestate; /* tuple store to store distributed results */
} CitusScanState;
/* custom scan methods for all executors */
extern CustomScanMethods RealTimeCustomScanMethods;
extern CustomScanMethods TaskTrackerCustomScanMethods;
extern CustomScanMethods RouterCustomScanMethods;
extern CustomScanMethods CoordinatorInsertSelectCustomScanMethods;
extern CustomScanMethods DelayedErrorCustomScanMethods;
extern void RegisterCitusCustomScanMethods(void);
extern void CitusExplainScan(CustomScanState *node, List *ancestors, struct
ExplainState *es);
#endif /* CITUS_CUSTOM_SCAN_H */

View File

@ -14,20 +14,11 @@
#include "nodes/parsenodes.h"
#include "nodes/execnodes.h"
#include "distributed/citus_custom_scan.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_server_executor.h"
typedef struct CitusScanState
{
CustomScanState customScanState; /* underlying custom scan node */
DistributedPlan *distributedPlan; /* distributed execution plan */
MultiExecutorType executorType; /* distributed executor type */
bool finishedRemoteScan; /* flag to check if remote scan is finished */
Tuplestorestate *tuplestorestate; /* tuple store to store distributed results */
} CitusScanState;
/* managed via guc.c */
typedef enum
{
@ -36,19 +27,9 @@ typedef enum
} MultiShardConnectionTypes;
extern int MultiShardConnectionType;
extern Node * RealTimeCreateScan(CustomScan *scan);
extern Node * TaskTrackerCreateScan(CustomScan *scan);
extern Node * RouterCreateScan(CustomScan *scan);
extern Node * CoordinatorInsertSelectCreateScan(CustomScan *scan);
extern Node * DelayedErrorCreateScan(CustomScan *scan);
extern void CitusSelectBeginScan(CustomScanState *node, EState *estate, int eflags);
extern TupleTableSlot * RealTimeExecScan(CustomScanState *node);
extern TupleTableSlot * TaskTrackerExecScan(CustomScanState *node);
extern void CitusEndScan(CustomScanState *node);
extern void CitusReScan(CustomScanState *node);
extern void CitusExplainScan(CustomScanState *node, List *ancestors, struct
ExplainState *es);
extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState);
extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob);
#endif /* MULTI_EXECUTOR_H */

View File

@ -202,6 +202,10 @@ extern void CleanupTaskExecution(TaskExecution *taskExecution);
extern bool TaskExecutionFailed(TaskExecution *taskExecution);
extern void AdjustStateForFailure(TaskExecution *taskExecution);
extern int MaxMasterConnectionCount(void);
extern void PrepareMasterJobDirectory(Job *workerJob);
extern TupleTableSlot * RealTimeExecScan(CustomScanState *node);
extern TupleTableSlot * TaskTrackerExecScan(CustomScanState *node);
#endif /* MULTI_SERVER_EXECUTOR_H */