Use CustomScan API for query execution

Custom Scan is a node in the planned statement which helps external providers
to abstract data scan not just for foreign data wrappers but also for regular
relations so you can benefit your version of caching or hardware optimizations.
This sounds like only an abstraction on the data scan layer, but we can use it
as an abstraction for our distributed queries. The only thing we need to do is
to find distributable parts of the query, plan for them and replace them with
a Citus Custom Scan. Then, whenever PostgreSQL hits this custom scan node in
its Vulcano style execution, it will call our callback functions which run
distributed plan and provides tuples to the upper node as it scans a regular
relation. This means fewer code changes, fewer bugs and more supported features
for us!

First, in the distributed query planner phase, we create a Custom Scan which
wraps the distributed plan. For real-time and task-tracker executors, we add
this custom plan under the master query plan. For router executor, we directly
pass the custom plan because there is not any master query. Then, we simply let
the PostgreSQL executor run this plan. When it hits the custom scan node, we
call the related executor parts for distributed plan, fill the tuple store in
the custom scan and return results to PostgreSQL executor in Vulcano style,
a tuple per XXX_ExecScan() call.

* Modify planner to utilize Custom Scan node.
* Create different scan methods for different executors.
* Use native PostgreSQL Explain for master part of queries.
pull/1185/head
Metin Doslu 2017-02-15 18:00:54 +02:00
parent 52358fe891
commit 1f838199f8
46 changed files with 1155 additions and 964 deletions

View File

@ -24,7 +24,6 @@
#include "distributed/multi_utility.h"
#include "distributed/worker_protocol.h"
#include "executor/execdebug.h"
#include "executor/executor.h"
#include "commands/copy.h"
#include "nodes/makefuncs.h"
#include "storage/lmgr.h"
@ -34,171 +33,276 @@
/*
* FIXME: It'd probably be better to have different set of methods for:
* - router readonly queries
* - router modify
* - router insert ... select
* - real-time/task-tracker (no point in seperating those)
*
* I think it's better however to only have one type of CitusScanState, to
* allow to easily share code between routines.
* Define executor methods for the different executor types.
*/
static CustomExecMethods CitusCustomExecMethods = {
"CitusScan",
CitusBeginScan,
CitusExecScan,
CitusEndScan,
CitusReScan,
#if (PG_VERSION_NUM >= 90600)
NULL, /* NO EstimateDSMCustomScan callback */
NULL, /* NO InitializeDSMCustomScan callback */
NULL, /* NO InitializeWorkerCustomScan callback */
#endif
NULL,
NULL,
CitusExplainScan
static CustomExecMethods RealTimeCustomExecMethods = {
.CustomName = "RealTimeScan",
.BeginCustomScan = CitusSelectBeginScan,
.ExecCustomScan = RealTimeExecScan,
.EndCustomScan = CitusEndScan,
.ReScanCustomScan = CitusReScan,
.ExplainCustomScan = CitusExplainScan
};
static CustomExecMethods TaskTrackerCustomExecMethods = {
.CustomName = "TaskTrackerScan",
.BeginCustomScan = CitusSelectBeginScan,
.ExecCustomScan = TaskTrackerExecScan,
.EndCustomScan = CitusEndScan,
.ReScanCustomScan = CitusReScan,
.ExplainCustomScan = CitusExplainScan
};
static CustomExecMethods RouterSingleModifyCustomExecMethods = {
.CustomName = "RouterSingleModifyScan",
.BeginCustomScan = CitusModifyBeginScan,
.ExecCustomScan = RouterSingleModifyExecScan,
.EndCustomScan = CitusEndScan,
.ReScanCustomScan = CitusReScan,
.ExplainCustomScan = CitusExplainScan
};
static CustomExecMethods RouterMultiModifyCustomExecMethods = {
.CustomName = "RouterMultiModifyScan",
.BeginCustomScan = CitusModifyBeginScan,
.ExecCustomScan = RouterMultiModifyExecScan,
.EndCustomScan = CitusEndScan,
.ReScanCustomScan = CitusReScan,
.ExplainCustomScan = CitusExplainScan
};
static CustomExecMethods RouterSelectCustomExecMethods = {
.CustomName = "RouterSelectScan",
.BeginCustomScan = CitusSelectBeginScan,
.ExecCustomScan = RouterSelectExecScan,
.EndCustomScan = CitusEndScan,
.ReScanCustomScan = CitusReScan,
.ExplainCustomScan = CitusExplainScan
};
/* local function forward declarations */
static void PrepareMasterJobDirectory(Job *workerJob);
static void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob);
static Relation StubRelation(TupleDesc tupleDescriptor);
/*
* RealTimeCreateScan creates the scan state for real-time executor queries.
*/
Node *
CitusCreateScan(CustomScan *scan)
RealTimeCreateScan(CustomScan *scan)
{
CitusScanState *scanState = palloc0(sizeof(CitusScanState));
scanState->executorType = MULTI_EXECUTOR_REAL_TIME;
scanState->customScanState.ss.ps.type = T_CustomScanState;
scanState->customScanState.methods = &CitusCustomExecMethods;
scanState->multiPlan = GetMultiPlan(scan);
scanState->executorType = JobExecutorType(scanState->multiPlan);
scanState->customScanState.methods = &RealTimeCustomExecMethods;
return (Node *) scanState;
}
void
CitusBeginScan(CustomScanState *node,
EState *estate,
int eflags)
/*
* TaskTrackerCreateScan creates the scan state for task-tracker executor queries.
*/
Node *
TaskTrackerCreateScan(CustomScan *scan)
{
CitusScanState *scanState = (CitusScanState *) node;
MultiPlan *multiPlan = scanState->multiPlan;
CitusScanState *scanState = palloc0(sizeof(CitusScanState));
Assert(IsA(scanState, CustomScanState));
scanState->executorType = MULTI_EXECUTOR_TASK_TRACKER;
scanState->customScanState.ss.ps.type = T_CustomScanState;
scanState->multiPlan = GetMultiPlan(scan);
/* ensure plan is executable */
VerifyMultiPlanValidity(multiPlan);
scanState->customScanState.methods = &TaskTrackerCustomExecMethods;
/* ExecCheckRTPerms(planStatement->rtable, true); */
if (scanState->executorType == MULTI_EXECUTOR_ROUTER)
{
RouterBeginScan(scanState);
}
return (Node *) scanState;
}
TupleTableSlot *
CitusExecScan(CustomScanState *node)
/*
* RouterCreateScan creates the scan state for router executor queries.
*/
Node *
RouterCreateScan(CustomScan *scan)
{
CitusScanState *scanState = (CitusScanState *) node;
MultiPlan *multiPlan = scanState->multiPlan;
CitusScanState *scanState = palloc0(sizeof(CitusScanState));
MultiPlan *multiPlan = NULL;
Job *workerJob = NULL;
List *taskList = NIL;
bool isModificationQuery = false;
if (scanState->executorType == MULTI_EXECUTOR_ROUTER)
scanState->executorType = MULTI_EXECUTOR_ROUTER;
scanState->customScanState.ss.ps.type = T_CustomScanState;
scanState->multiPlan = GetMultiPlan(scan);
multiPlan = scanState->multiPlan;
workerJob = multiPlan->workerJob;
taskList = workerJob->taskList;
isModificationQuery = IsModifyMultiPlan(multiPlan);
/* check if this is a single shard query */
if (list_length(taskList) == 1)
{
return RouterExecScan(scanState);
if (isModificationQuery)
{
scanState->customScanState.methods = &RouterSingleModifyCustomExecMethods;
}
else
{
TupleTableSlot *resultSlot = scanState->customScanState.ss.ps.ps_ResultTupleSlot;
if (!scanState->finishedUnderlyingScan)
scanState->customScanState.methods = &RouterSelectCustomExecMethods;
}
}
else
{
Job *workerJob = multiPlan->workerJob;
StringInfo jobDirectoryName = NULL;
EState *executorState = scanState->customScanState.ss.ps.state;
List *workerTaskList = workerJob->taskList;
ListCell *workerTaskCell = NULL;
TupleDesc tupleDescriptor = NULL;
Relation fakeRel = NULL;
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
ExprContext *executorExpressionContext =
GetPerTupleExprContext(executorState);
uint32 columnCount = 0;
Datum *columnValues = NULL;
bool *columnNulls = NULL;
Assert(isModificationQuery);
scanState->customScanState.methods = &RouterMultiModifyCustomExecMethods;
}
/*
* We create a directory on the master node to keep task execution results.
* We also register this directory for automatic cleanup on portal delete.
return (Node *) scanState;
}
/*
* DelayedErrorCreateScan is only called if we could not plan for the given
* query. This is the case when a plan is not ready for execution because
* CreateDistributedPlan() couldn't find a plan due to unresolved prepared
* statement parameters, but didn't error out, because we expect custom plans
* to come to our rescue. But sql (not plpgsql) functions unfortunately don't
* go through a codepath supporting custom plans. Here, we error out with this
* delayed error message.
*/
jobDirectoryName = MasterJobDirectoryName(workerJob->jobId);
Node *
DelayedErrorCreateScan(CustomScan *scan)
{
MultiPlan *multiPlan = GetMultiPlan(scan);
/* raise the deferred error */
RaiseDeferredError(multiPlan->planningError, ERROR);
return NULL;
}
/*
* CitusSelectBeginScan is an empty function for BeginCustomScan callback.
*/
void
CitusSelectBeginScan(CustomScanState *node, EState *estate, int eflags)
{
/* just an empty function */
}
/*
* RealTimeExecScan is a callback function which returns next tuple from a real-time
* execution. In the first call, it executes distributed real-time plan and loads
* results from temporary files into custom scan's tuple store. Then, it returns
* tuples one by one from this tuple store.
*/
TupleTableSlot *
RealTimeExecScan(CustomScanState *node)
{
CitusScanState *scanState = (CitusScanState *) node;
TupleTableSlot *resultSlot = NULL;
if (!scanState->finishedRemoteScan)
{
MultiPlan *multiPlan = scanState->multiPlan;
Job *workerJob = multiPlan->workerJob;
PrepareMasterJobDirectory(workerJob);
MultiRealTimeExecute(workerJob);
LoadTuplesIntoTupleStore(scanState, workerJob);
scanState->finishedRemoteScan = true;
}
resultSlot = ReturnTupleFromTuplestore(scanState);
return resultSlot;
}
/*
* PrepareMasterJobDirectory creates a directory on the master node to keep job
* execution results. We also register this directory for automatic cleanup on
* portal delete.
*/
static void
PrepareMasterJobDirectory(Job *workerJob)
{
StringInfo jobDirectoryName = MasterJobDirectoryName(workerJob->jobId);
CreateDirectory(jobDirectoryName);
ResourceOwnerEnlargeJobDirectories(CurrentResourceOwner);
ResourceOwnerRememberJobDirectory(CurrentResourceOwner, workerJob->jobId);
}
/* pick distributed executor to use */
if (executorState->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY)
{
/* skip distributed query execution for EXPLAIN commands */
}
else if (scanState->executorType == MULTI_EXECUTOR_REAL_TIME)
{
MultiRealTimeExecute(workerJob);
}
else if (scanState->executorType == MULTI_EXECUTOR_TASK_TRACKER)
{
MultiTaskTrackerExecute(workerJob);
}
tupleDescriptor = node->ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor;
/*
* Load data, collected by Multi*Execute() above, into a
* tuplestore. For that first create a tuplestore, and then copy
* the files one-by-one.
/*
* Load data collected by real-time or task-tracker executors into the tuplestore
* of CitusScanState. For that, we first create a tuple store, and then copy the
* files one-by-one into the tuple store.
*
* FIXME: Should probably be in a separate routine.
*
* Long term it'd be a lot better if Multi*Execute() directly
* Note that in the long term it'd be a lot better if Multi*Execute() directly
* filled the tuplestores, but that's a fair bit of work.
*/
static void
LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob)
{
CustomScanState customScanState = citusScanState->customScanState;
List *workerTaskList = workerJob->taskList;
List *copyOptions = NIL;
EState *executorState = NULL;
MemoryContext executorTupleContext = NULL;
ExprContext *executorExpressionContext = NULL;
TupleDesc tupleDescriptor = NULL;
Relation stubRelation = NULL;
ListCell *workerTaskCell = NULL;
uint32 columnCount = 0;
Datum *columnValues = NULL;
bool *columnNulls = NULL;
bool randomAccess = true;
bool interTransactions = false;
/*
* To be able to use copy.c, we need a Relation descriptor. As
* there's no relation corresponding to the data loaded from
* workers, fake one. We just need the bare minimal set of fields
* accessed by BeginCopyFrom().
*
* FIXME: should be abstracted into a separate function.
*/
fakeRel = palloc0(sizeof(RelationData));
fakeRel->rd_att = tupleDescriptor;
fakeRel->rd_rel = palloc0(sizeof(FormData_pg_class));
fakeRel->rd_rel->relkind = RELKIND_RELATION;
executorState = citusScanState->customScanState.ss.ps.state;
executorTupleContext = GetPerTupleMemoryContext(executorState);
executorExpressionContext = GetPerTupleExprContext(executorState);
tupleDescriptor = customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor;
stubRelation = StubRelation(tupleDescriptor);
columnCount = tupleDescriptor->natts;
columnValues = palloc0(columnCount * sizeof(Datum));
columnNulls = palloc0(columnCount * sizeof(bool));
Assert(scanState->tuplestorestate == NULL);
scanState->tuplestorestate = tuplestore_begin_heap(false, false, work_mem);
Assert(citusScanState->tuplestorestate == NULL);
citusScanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
if (BinaryMasterCopyFormat)
{
DefElem *copyOption = makeDefElem("format", (Node *) makeString("binary"));
copyOptions = lappend(copyOptions, copyOption);
}
foreach(workerTaskCell, workerTaskList)
{
Task *workerTask = (Task *) lfirst(workerTaskCell);
StringInfo jobDirectoryName = MasterJobDirectoryName(workerTask->jobId);
StringInfo taskFilename =
TaskFilename(jobDirectoryName, workerTask->taskId);
List *copyOptions = NIL;
StringInfo jobDirectoryName = NULL;
StringInfo taskFilename = NULL;
CopyState copyState = NULL;
if (BinaryMasterCopyFormat)
{
DefElem *copyOption = makeDefElem("format",
(Node *) makeString("binary"));
copyOptions = lappend(copyOptions, copyOption);
}
copyState = BeginCopyFrom(fakeRel, taskFilename->data, false, NULL,
jobDirectoryName = MasterJobDirectoryName(workerTask->jobId);
taskFilename = TaskFilename(jobDirectoryName, workerTask->taskId);
copyState = BeginCopyFrom(stubRelation, taskFilename->data, false, NULL,
copyOptions);
while (true)
@ -217,29 +321,101 @@ CitusExecScan(CustomScanState *node)
break;
}
tuplestore_putvalues(scanState->tuplestorestate,
tupleDescriptor,
tuplestore_putvalues(citusScanState->tuplestorestate, tupleDescriptor,
columnValues, columnNulls);
MemoryContextSwitchTo(oldContext);
}
}
scanState->finishedUnderlyingScan = true;
}
if (scanState->tuplestorestate != NULL)
{
Tuplestorestate *tupleStore = scanState->tuplestorestate;
tuplestore_gettupleslot(tupleStore, true, false, resultSlot);
return resultSlot;
}
return NULL;
EndCopyFrom(copyState);
}
}
/*
* StubRelation creates a stub Relation from the given tuple descriptor.
* To be able to use copy.c, we need a Relation descriptor. As there is no
* relation corresponding to the data loaded from workers, we need to fake one.
* We just need the bare minimal set of fields accessed by BeginCopyFrom().
*/
static Relation
StubRelation(TupleDesc tupleDescriptor)
{
Relation stubRelation = palloc0(sizeof(RelationData));
stubRelation->rd_att = tupleDescriptor;
stubRelation->rd_rel = palloc0(sizeof(FormData_pg_class));
stubRelation->rd_rel->relkind = RELKIND_RELATION;
return stubRelation;
}
/*
* ReturnTupleFromTuplestore reads the next tuple from the tuple store of the
* given Citus scan node and returns it. It returns null if all tuples are read
* from the tuple store.
*/
TupleTableSlot *
ReturnTupleFromTuplestore(CitusScanState *scanState)
{
Tuplestorestate *tupleStore = scanState->tuplestorestate;
TupleTableSlot *resultSlot = NULL;
ScanDirection scanDirection = NoMovementScanDirection;
bool forwardScanDirection = true;
if (tupleStore == NULL)
{
return NULL;
}
scanDirection = scanState->customScanState.ss.ps.state->es_direction;
Assert(ScanDirectionIsValid(scanDirection));
if (ScanDirectionIsBackward(scanDirection))
{
forwardScanDirection = false;
}
resultSlot = scanState->customScanState.ss.ps.ps_ResultTupleSlot;
tuplestore_gettupleslot(tupleStore, forwardScanDirection, false, resultSlot);
return resultSlot;
}
/*
* TaskTrackerExecScan is a callback function which returns next tuple from a
* task-tracker execution. In the first call, it executes distributed task-tracker
* plan and loads results from temporary files into custom scan's tuple store.
* Then, it returns tuples one by one from this tuple store.
*/
TupleTableSlot *
TaskTrackerExecScan(CustomScanState *node)
{
CitusScanState *scanState = (CitusScanState *) node;
TupleTableSlot *resultSlot = NULL;
if (!scanState->finishedRemoteScan)
{
MultiPlan *multiPlan = scanState->multiPlan;
Job *workerJob = multiPlan->workerJob;
PrepareMasterJobDirectory(workerJob);
MultiTaskTrackerExecute(workerJob);
LoadTuplesIntoTupleStore(scanState, workerJob);
scanState->finishedRemoteScan = true;
}
resultSlot = ReturnTupleFromTuplestore(scanState);
return resultSlot;
}
/*
* CitusEndScan is used to clean up tuple store of the given custom scan state.
*/
void
CitusEndScan(CustomScanState *node)
{
@ -253,17 +429,14 @@ CitusEndScan(CustomScanState *node)
}
/*
* CitusReScan is just a place holder for rescan callback. Currently, we don't
* support rescan given that there is not any way to reach this code path.
*/
void
CitusReScan(CustomScanState *node)
{
CitusScanState *scanState = (CitusScanState *) node;
scanState->tuplestorestate = NULL;
scanState->finishedUnderlyingScan = true;
/*
* XXX: this probably already works, but if not should be easily
* supportable - probably hard to exercise right now though.
*/
elog(WARNING, "unsupported at this point");
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("rescan is unsupported"),
errdetail("We don't expect this code path to be executed.")));
}

View File

@ -77,26 +77,24 @@ static void ReacquireMetadataLocks(List *taskList);
static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task,
bool expectResults);
static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task);
static List * GetModifyConnections(List *taskPlacementList,
bool markCritical,
static List * GetModifyConnections(List *taskPlacementList, bool markCritical,
bool startedInTransaction);
static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
bool isModificationQuery, bool expectResults);
static int64 ExecuteModifyTasks(List *taskList, bool expectResults,
ParamListInfo paramListInfo,
CitusScanState *scanState,
TupleDesc tupleDescriptor);
ParamListInfo paramListInfo, CitusScanState *scanState);
static List * TaskShardIntervalList(List *taskList);
static void AcquireExecutorShardLock(Task *task, CmdType commandType);
static void AcquireExecutorMultiShardLocks(List *taskList);
static bool RequiresConsistentSnapshot(Task *task);
static void ProcessMasterEvaluableFunctions(Job *workerJob);
static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues);
static bool SendQueryInSingleRowMode(MultiConnection *connection, char *query,
ParamListInfo paramListInfo);
static bool StoreQueryResult(CitusScanState *scanState, MultiConnection *connection,
TupleDesc tupleDescriptor, bool failOnError, int64 *rows);
bool failOnError, int64 *rows);
static bool ConsumeQueryResult(MultiConnection *connection, bool failOnError,
int64 *rows);
@ -407,9 +405,14 @@ RequiresConsistentSnapshot(Task *task)
}
/*
* CitusModifyBeginScan checks the validity of the given custom scan node and
* gets locks on the shards involved in the task list of the distributed plan.
*/
void
RouterBeginScan(CitusScanState *scanState)
CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags)
{
CitusScanState *scanState = (CitusScanState *) node;
MultiPlan *multiPlan = scanState->multiPlan;
Job *workerJob = multiPlan->workerJob;
List *taskList = workerJob->taskList;
@ -428,77 +431,115 @@ RouterBeginScan(CitusScanState *scanState)
}
/*
* RouterSingleModifyExecScan executes a single modification query on a
* distributed plan and returns results if there is any.
*/
TupleTableSlot *
RouterExecScan(CitusScanState *scanState)
RouterSingleModifyExecScan(CustomScanState *node)
{
MultiPlan *multiPlan = scanState->multiPlan;
TupleTableSlot *resultSlot = scanState->customScanState.ss.ps.ps_ResultTupleSlot;
CitusScanState *scanState = (CitusScanState *) node;
TupleTableSlot *resultSlot = NULL;
if (!scanState->finishedUnderlyingScan)
if (!scanState->finishedRemoteScan)
{
MultiPlan *multiPlan = scanState->multiPlan;
bool hasReturning = multiPlan->hasReturning;
Job *workerJob = multiPlan->workerJob;
List *taskList = workerJob->taskList;
bool requiresMasterEvaluation = workerJob->requiresMasterEvaluation;
bool isModificationQuery = false;
CmdType operation = multiPlan->operation;
Task *task = (Task *) linitial(taskList);
/* should use IsModificationStmt or such */
if (operation == CMD_INSERT || operation == CMD_UPDATE ||
operation == CMD_DELETE)
{
isModificationQuery = true;
ProcessMasterEvaluableFunctions(workerJob);
ExecuteSingleModifyTask(scanState, task, hasReturning);
scanState->finishedRemoteScan = true;
}
if (requiresMasterEvaluation)
resultSlot = ReturnTupleFromTuplestore(scanState);
return resultSlot;
}
/*
* ProcessMasterEvaluableFunctions executes evaluable functions and rebuilds
* the query strings in task lists.
*/
static void
ProcessMasterEvaluableFunctions(Job *workerJob)
{
if (workerJob->requiresMasterEvaluation)
{
Query *jobQuery = workerJob->jobQuery;
List *taskList = workerJob->taskList;
ExecuteMasterEvaluableFunctions(jobQuery);
RebuildQueryStrings(jobQuery, taskList);
}
}
if (list_length(taskList) == 1)
{
Task *task = (Task *) linitial(taskList);
if (isModificationQuery)
{
bool sendTuples = multiPlan->hasReturning;
ExecuteSingleModifyTask(scanState, task, sendTuples);
}
else
{
ExecuteSingleSelectTask(scanState, task);
}
}
else
{
bool sendTuples = multiPlan->hasReturning;
ExecuteMultipleTasks(scanState, taskList, isModificationQuery,
sendTuples);
}
/* mark underlying query as having executed */
scanState->finishedUnderlyingScan = true;
}
/* if the underlying query produced output, return it */
/*
* FIXME: centralize this into function to be shared between router and
* other executors?
/*
* RouterMultiModifyExecScan executes a list of tasks on remote nodes, retrieves
* the results and, if RETURNING is used, stores them in custom scan's tuple store.
* Then, it returns tuples one by one from this tuple store.
*/
if (scanState->tuplestorestate != NULL)
{
Tuplestorestate *tupleStore = scanState->tuplestorestate;
TupleTableSlot *
RouterMultiModifyExecScan(CustomScanState *node)
{
CitusScanState *scanState = (CitusScanState *) node;
TupleTableSlot *resultSlot = NULL;
/* XXX: could trivially support backward scans here */
tuplestore_gettupleslot(tupleStore, true, false, resultSlot);
if (!scanState->finishedRemoteScan)
{
MultiPlan *multiPlan = scanState->multiPlan;
Job *workerJob = multiPlan->workerJob;
List *taskList = workerJob->taskList;
bool hasReturning = multiPlan->hasReturning;
bool isModificationQuery = true;
ProcessMasterEvaluableFunctions(workerJob);
ExecuteMultipleTasks(scanState, taskList, isModificationQuery, hasReturning);
scanState->finishedRemoteScan = true;
}
resultSlot = ReturnTupleFromTuplestore(scanState);
return resultSlot;
}
/*
* RouterSelectExecScan executes a single select task on the remote node,
* retrieves the results and stores them in custom scan's tuple store. Then, it
* returns tuples one by one from this tuple store.
*/
TupleTableSlot *
RouterSelectExecScan(CustomScanState *node)
{
CitusScanState *scanState = (CitusScanState *) node;
TupleTableSlot *resultSlot = NULL;
if (!scanState->finishedRemoteScan)
{
MultiPlan *multiPlan = scanState->multiPlan;
Job *workerJob = multiPlan->workerJob;
List *taskList = workerJob->taskList;
Task *task = (Task *) linitial(taskList);
ProcessMasterEvaluableFunctions(workerJob);
ExecuteSingleSelectTask(scanState, task);
scanState->finishedRemoteScan = true;
}
return NULL;
resultSlot = ReturnTupleFromTuplestore(scanState);
return resultSlot;
}
@ -512,8 +553,6 @@ RouterExecScan(CitusScanState *scanState)
static void
ExecuteSingleSelectTask(CitusScanState *scanState, Task *task)
{
TupleDesc tupleDescriptor =
scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor;
ParamListInfo paramListInfo =
scanState->customScanState.ss.ps.state->es_param_list_info;
List *taskPlacementList = task->taskPlacementList;
@ -547,8 +586,8 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task)
continue;
}
queryOK = StoreQueryResult(scanState, connection, tupleDescriptor,
dontFailOnError, &currentAffectedTupleCount);
queryOK = StoreQueryResult(scanState, connection, dontFailOnError,
&currentAffectedTupleCount);
if (queryOK)
{
return;
@ -569,21 +608,19 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task)
* framework), or errors out (failed on all placements).
*/
static void
ExecuteSingleModifyTask(CitusScanState *scanState, Task *task,
bool expectResults)
ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResults)
{
CmdType operation = scanState->multiPlan->operation;
TupleDesc tupleDescriptor =
scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor;
EState *executorState = scanState->customScanState.ss.ps.state;
ParamListInfo paramListInfo = executorState->es_param_list_info;
bool resultsOK = false;
List *taskPlacementList = task->taskPlacementList;
List *connectionList = NIL;
ListCell *taskPlacementCell = NULL;
ListCell *connectionCell = NULL;
int64 affectedTupleCount = -1;
bool resultsOK = false;
bool gotResults = false;
char *queryString = task->queryString;
bool taskRequiresTwoPhaseCommit = (task->replicationModel == REPLICATION_MODEL_2PC);
bool startedInTransaction =
@ -669,8 +706,8 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task,
*/
if (!gotResults && expectResults)
{
queryOK = StoreQueryResult(scanState, connection, tupleDescriptor,
failOnError, &currentAffectedTupleCount);
queryOK = StoreQueryResult(scanState, connection, failOnError,
&currentAffectedTupleCount);
}
else
{
@ -804,8 +841,6 @@ static void
ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
bool isModificationQuery, bool expectResults)
{
TupleDesc tupleDescriptor =
scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor;
EState *executorState = scanState->customScanState.ss.ps.state;
ParamListInfo paramListInfo = executorState->es_param_list_info;
int64 affectedTupleCount = -1;
@ -813,9 +848,8 @@ ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
/* can only support modifications right now */
Assert(isModificationQuery);
/* XXX: Seems very redundant to pass both scanState and tupleDescriptor */
affectedTupleCount = ExecuteModifyTasks(taskList, expectResults, paramListInfo,
scanState, tupleDescriptor);
scanState);
executorState->es_processed = affectedTupleCount;
}
@ -831,7 +865,7 @@ ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
int64
ExecuteModifyTasksWithoutResults(List *taskList)
{
return ExecuteModifyTasks(taskList, false, NULL, NULL, NULL);
return ExecuteModifyTasks(taskList, false, NULL, NULL);
}
@ -845,7 +879,7 @@ ExecuteModifyTasksWithoutResults(List *taskList)
*/
static int64
ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListInfo,
CitusScanState *scanState, TupleDesc tupleDescriptor)
CitusScanState *scanState)
{
int64 totalAffectedTupleCount = 0;
ListCell *taskCell = NULL;
@ -929,8 +963,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
continue;
}
connection =
(MultiConnection *) list_nth(connectionList, placementIndex);
connection = (MultiConnection *) list_nth(connectionList, placementIndex);
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
if (!queryOK)
@ -975,10 +1008,10 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
*/
if (placementIndex == 0 && expectResults)
{
Assert(scanState != NULL && tupleDescriptor != NULL);
Assert(scanState != NULL);
queryOK = StoreQueryResult(scanState, connection, tupleDescriptor,
failOnError, &currentAffectedTupleCount);
queryOK = StoreQueryResult(scanState, connection, failOnError,
&currentAffectedTupleCount);
}
else
{
@ -1184,13 +1217,17 @@ ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterT
*/
static bool
StoreQueryResult(CitusScanState *scanState, MultiConnection *connection,
TupleDesc tupleDescriptor, bool failOnError, int64 *rows)
bool failOnError, int64 *rows)
{
TupleDesc tupleDescriptor =
scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor;
AttInMetadata *attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor);
Tuplestorestate *tupleStore = NULL;
List *targetList = scanState->customScanState.ss.ps.plan->targetlist;
uint32 expectedColumnCount = ExecCleanTargetListLength(targetList);
char **columnArray = (char **) palloc0(expectedColumnCount * sizeof(char *));
Tuplestorestate *tupleStore = NULL;
bool randomAccess = true;
bool interTransactions = false;
bool commandFailed = false;
MemoryContext ioContext = AllocSetContextCreate(CurrentMemoryContext,
"StoreQueryResult",
@ -1201,7 +1238,8 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection,
if (scanState->tuplestorestate == NULL)
{
scanState->tuplestorestate = tuplestore_begin_heap(false, false, work_mem);
scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
}
else if (!failOnError)
{
@ -1403,39 +1441,3 @@ ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows)
return gotResponse && !commandFailed;
}
/*
* RouterExecutorFinish cleans up after a distributed execution.
*/
void
RouterExecutorFinish(QueryDesc *queryDesc)
{
EState *estate = queryDesc->estate;
Assert(estate != NULL);
estate->es_finished = true;
}
/*
* RouterExecutorEnd cleans up the executor state after a distributed
* execution.
*/
void
RouterExecutorEnd(QueryDesc *queryDesc)
{
EState *estate = queryDesc->estate;
MaterialState *routerState = (MaterialState *) queryDesc->planstate;
if (routerState->tuplestorestate)
{
tuplestore_end(routerState->tuplestorestate);
}
Assert(estate != NULL);
FreeExecutorState(estate);
queryDesc->estate = NULL;
queryDesc->totaltime = NULL;
}

View File

@ -84,12 +84,17 @@ static void ExplainXMLTag(const char *tagname, int flags, ExplainState *es);
static void ExplainJSONLineEnding(ExplainState *es);
static void ExplainYAMLLineStarting(ExplainState *es);
/*
* CitusExplainScan is a custom scan explain callback function which is used to
* print explain information of a Citus plan which includes both master and
* distributed plan.
*/
void
CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es)
{
CitusScanState *scanState = (CitusScanState *) node;
MultiPlan *multiPlan = scanState->multiPlan;
const char *executorName = NULL;
if (!ExplainDistributedQueries)
{
@ -99,44 +104,8 @@ CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es
return;
}
/*
* XXX: can we get by without the open/close group somehow - then we'd not
* copy any code from explain.c? Seems unlikely.
*/
ExplainOpenGroup("Distributed Query", "Distributed Query", true, es);
/*
* XXX: might be worthwhile to put this somewhere central, e.g. for
* debugging output.
*/
switch (scanState->executorType)
{
case MULTI_EXECUTOR_ROUTER:
{
executorName = "Router";
}
break;
case MULTI_EXECUTOR_REAL_TIME:
{
executorName = "Real-Time";
}
break;
case MULTI_EXECUTOR_TASK_TRACKER:
{
executorName = "Task-Tracker";
}
break;
default:
{
executorName = "Other";
}
break;
}
ExplainPropertyText("Executor", executorName, es);
ExplainJob(multiPlan->workerJob, es);
ExplainCloseGroup("Distributed Query", "Distributed Query", true, es);

View File

@ -15,6 +15,7 @@
#include "distributed/multi_master_planner.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/worker_protocol.h"
#include "nodes/makefuncs.h"
@ -34,7 +35,7 @@
* a target target list for the master node. This master target list keeps the
* temporary table's columns on the master node.
*/
List *
static List *
MasterTargetList(List *workerTargetList)
{
List *masterTargetList = NIL;
@ -164,67 +165,57 @@ BuildAggregatePlan(Query *masterQuery, Plan *subPlan)
/*
* BuildSelectStatement builds the final select statement to run on the master
* node, before returning results to the user. The function first builds a scan
* statement for all results fetched to the master, and layers aggregation, sort
* node, before returning results to the user. The function first gets the custom
* scan node for all results fetched to the master, and layers aggregation, sort
* and limit plans on top of the scan statement if necessary.
*/
static PlannedStmt *
BuildSelectStatement(Query *masterQuery, char *masterTableName,
List *masterTargetList, CustomScan *dataScan)
BuildSelectStatement(Query *masterQuery, List *masterTargetList, CustomScan *remoteScan)
{
PlannedStmt *selectStatement = NULL;
RangeTblEntry *rangeTableEntry = NULL;
RangeTblEntry *queryRangeTableEntry = NULL;
RangeTblEntry *customScanRangeTableEntry = NULL;
Agg *aggregationPlan = NULL;
Plan *topLevelPlan = NULL;
ListCell *lc = NULL;
List *columnNames = NULL;
/* (0) compute column names */
foreach(lc, masterTargetList)
{
TargetEntry *te = lfirst(lc);
columnNames = lappend(columnNames, makeString(te->resname));
}
ListCell *targetEntryCell = NULL;
List *columnNameList = NULL;
/* (1) make PlannedStmt and set basic information */
selectStatement = makeNode(PlannedStmt);
selectStatement->canSetTag = true;
selectStatement->relationOids = NIL; /* to be filled in exec_Start */
selectStatement->relationOids = NIL;
selectStatement->commandType = CMD_SELECT;
/* prepare the range table entry for our temporary table */
/* top level select query should have only one range table entry */
Assert(list_length(masterQuery->rtable) == 1);
queryRangeTableEntry = (RangeTblEntry *) linitial(masterQuery->rtable);
rangeTableEntry = copyObject(queryRangeTableEntry);
rangeTableEntry->rtekind = RTE_VALUES; /* can't look up relation */
rangeTableEntry->eref = makeAlias("remote scan", columnNames);
rangeTableEntry->inh = false;
rangeTableEntry->inFromCl = true;
/* compute column names for the custom range table entry */
foreach(targetEntryCell, masterTargetList)
{
TargetEntry *targetEntry = lfirst(targetEntryCell);
columnNameList = lappend(columnNameList, makeString(targetEntry->resname));
}
customScanRangeTableEntry = RemoteScanRangeTableEntry(columnNameList);
/* set the single element range table list */
selectStatement->rtable = list_make1(rangeTableEntry);
selectStatement->rtable = list_make1(customScanRangeTableEntry);
/* (2) build and initialize sequential scan node */
/* Gone */
/* (3) add an aggregation plan if needed */
/* (2) add an aggregation plan if needed */
if (masterQuery->hasAggs || masterQuery->groupClause)
{
dataScan->scan.plan.targetlist = masterTargetList;
remoteScan->scan.plan.targetlist = masterTargetList;
aggregationPlan = BuildAggregatePlan(masterQuery, &dataScan->scan.plan);
aggregationPlan = BuildAggregatePlan(masterQuery, &remoteScan->scan.plan);
topLevelPlan = (Plan *) aggregationPlan;
}
else
{
/* otherwise set the final projections on the scan plan directly */
dataScan->scan.plan.targetlist = masterQuery->targetList;
topLevelPlan = &dataScan->scan.plan;
remoteScan->scan.plan.targetlist = masterQuery->targetList;
topLevelPlan = &remoteScan->scan.plan;
}
/* (4) add a sorting plan if needed */
/* (3) add a sorting plan if needed */
if (masterQuery->sortClause)
{
List *sortClauseList = masterQuery->sortClause;
@ -242,7 +233,7 @@ BuildSelectStatement(Query *masterQuery, char *masterTableName,
topLevelPlan = (Plan *) sortPlan;
}
/* (5) add a limit plan if needed */
/* (4) add a limit plan if needed */
if (masterQuery->limitCount || masterQuery->limitOffset)
{
Node *limitCount = masterQuery->limitCount;
@ -259,7 +250,7 @@ BuildSelectStatement(Query *masterQuery, char *masterTableName,
topLevelPlan = (Plan *) limitPlan;
}
/* (6) finally set our top level plan in the plan tree */
/* (5) finally set our top level plan in the plan tree */
selectStatement->planTree = topLevelPlan;
return selectStatement;
@ -267,24 +258,24 @@ BuildSelectStatement(Query *masterQuery, char *masterTableName,
/*
* MasterNodeSelectPlan takes in a distributed plan, finds the master node query
* structure in that plan, and builds the final select plan to execute on the
* master node. Note that this select plan is executed after result files are
* retrieved from worker nodes and are merged into a temporary table.
* MasterNodeSelectPlan takes in a distributed plan and a custom scan node which
* wraps remote part of the plan. This function finds the master node query
* structure in the multi plan, and builds the final select plan to execute on
* the tuples returned by remote scan on the master node. Note that this select
* plan is executed after result files are retrieved from worker nodes and
* filled into the tuple store inside provided custom scan.
*/
PlannedStmt *
MasterNodeSelectPlan(MultiPlan *multiPlan, CustomScan *dataScan)
MasterNodeSelectPlan(MultiPlan *multiPlan, CustomScan *remoteScan)
{
Query *masterQuery = multiPlan->masterQuery;
char *tableName = multiPlan->masterTableName;
PlannedStmt *masterSelectPlan = NULL;
Job *workerJob = multiPlan->workerJob;
List *workerTargetList = workerJob->jobQuery->targetList;
List *masterTargetList = MasterTargetList(workerTargetList);
masterSelectPlan =
BuildSelectStatement(masterQuery, tableName, masterTargetList, dataScan);
masterSelectPlan = BuildSelectStatement(masterQuery, masterTargetList, remoteScan);
return masterSelectPlan;
}

View File

@ -196,9 +196,7 @@ MultiPlan *
MultiPhysicalPlanCreate(MultiTreeRoot *multiTree)
{
MultiPlan *multiPlan = NULL;
StringInfo jobSchemaName = NULL;
Job *workerJob = NULL;
uint64 workerJobId = 0;
Query *masterQuery = NULL;
List *masterDependedJobList = NIL;
@ -207,10 +205,6 @@ MultiPhysicalPlanCreate(MultiTreeRoot *multiTree)
/* create the tree of executable tasks for the worker job */
workerJob = BuildJobTreeTaskList(workerJob);
workerJobId = workerJob->jobId;
/* get job schema name */
jobSchemaName = JobSchemaName(workerJobId);
/* build the final merge query to execute on the master */
masterDependedJobList = list_make1(workerJob);
@ -219,7 +213,6 @@ MultiPhysicalPlanCreate(MultiTreeRoot *multiTree)
multiPlan = CitusMakeNode(MultiPlan);
multiPlan->workerJob = workerJob;
multiPlan->masterQuery = masterQuery;
multiPlan->masterTableName = jobSchemaName->data;
multiPlan->routerExecutable = MultiPlanRouterExecutable(multiPlan);
multiPlan->operation = CMD_SELECT;

View File

@ -23,30 +23,48 @@
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_master_planner.h"
#include "distributed/multi_router_planner.h"
#include "executor/executor.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/planner.h"
#include "utils/memutils.h"
static List *relationRestrictionContextList = NIL;
/* create custom scan methods for separate executors */
static CustomScanMethods RealTimeCustomScanMethods = {
"Citus Real-Time",
RealTimeCreateScan
};
static CustomScanMethods TaskTrackerCustomScanMethods = {
"Citus Task-Tracker",
TaskTrackerCreateScan
};
static CustomScanMethods RouterCustomScanMethods = {
"Citus Router",
RouterCreateScan
};
static CustomScanMethods DelayedErrorCustomScanMethods = {
"Citus Delayed Error",
DelayedErrorCreateScan
};
/* local function forward declarations */
static PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery,
Query *query, ParamListInfo boundParams,
RelationRestrictionContext *restrictionContext);
static Node * SerializeMultiPlan(struct MultiPlan *multiPlan);
static MultiPlan * DeserializeMultiPlan(Node *node);
static PlannedStmt * FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan);
static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan,
CustomScan *customScan);
static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan);
static void CheckNodeIsDumpable(Node *node);
static PlannedStmt * MultiQueryContainerNode(PlannedStmt *result,
struct MultiPlan *multiPlan);
static struct PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan,
Query *originalQuery,
Query *query,
ParamListInfo boundParams,
RelationRestrictionContext *
restrictionContext);
static RelationRestrictionContext * CreateAndPushRestrictionContext(void);
static RelationRestrictionContext * CurrentRestrictionContext(void);
static void PopRestrictionContext(void);
@ -144,22 +162,21 @@ IsModifyCommand(Query *query)
/*
* VerifyMultiPlanValidity verifies that multiPlan is ready for execution, or
* errors out if not.
*
* A plan may e.g. not be ready for execution because CreateDistributedPlan()
* couldn't find a plan due to unresolved prepared statement parameters, but
* didn't error out, because we expect custom plans to come to our rescue.
* But sql (not plpgsql) functions unfortunately don't go through a codepath
* supporting custom plans.
* IsModifyMultiPlan returns true if the multi plan performs modifications,
* false otherwise.
*/
void
VerifyMultiPlanValidity(MultiPlan *multiPlan)
bool
IsModifyMultiPlan(MultiPlan *multiPlan)
{
if (multiPlan->planningError)
bool isModifyMultiPlan = false;
CmdType operation = multiPlan->operation;
if (operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE)
{
RaiseDeferredError(multiPlan->planningError, ERROR);
isModifyMultiPlan = true;
}
return isModifyMultiPlan;
}
@ -274,8 +291,8 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query
RaiseDeferredError(distributedPlan->planningError, ERROR);
}
/* store required data into the planned statement */
resultPlan = MultiQueryContainerNode(localPlan, distributedPlan);
/* create final plan by combining local plan with distributed plan */
resultPlan = FinalizePlan(localPlan, distributedPlan);
/*
* As explained above, force planning costs to be unrealistically high if
@ -294,12 +311,6 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query
}
static CustomScanMethods CitusCustomScanMethods = {
"CitusScan",
CitusCreateScan
};
/*
* GetMultiPlan returns the associated MultiPlan for a CustomScan.
*/
@ -308,41 +319,34 @@ GetMultiPlan(CustomScan *customScan)
{
MultiPlan *multiPlan = NULL;
Assert(IsA(customScan, CustomScan));
Assert(customScan->methods == &CitusCustomScanMethods);
Assert(list_length(customScan->custom_private) == 1);
multiPlan = DeSerializeMultiPlan(linitial(customScan->custom_private));
multiPlan = DeserializeMultiPlan(linitial(customScan->custom_private));
return multiPlan;
}
/* Does the passed in statement require distributed execution? */
bool
HasCitusToplevelNode(PlannedStmt *result)
{
elog(ERROR, "gone");
}
Node *
SerializableMultiPlan(MultiPlan *multiPlan)
{
/*
* FIXME: This should be improved for 9.6+, we we can copy trees
* efficiently. I.e. we should introduce copy support for relevant node
* types, and just return the MultiPlan as-is for 9.6.
/*
* SerializeMultiPlan returns the string representing the distributed plan in a
* Const node.
*
* Note that this should be improved for 9.6+, we we can copy trees efficiently.
* I.e. we should introduce copy support for relevant node types, and just
* return the MultiPlan as-is for 9.6.
*/
char *serializedPlan = NULL;
static Node *
SerializeMultiPlan(MultiPlan *multiPlan)
{
char *serializedMultiPlan = NULL;
Const *multiPlanData = NULL;
serializedPlan = CitusNodeToString(multiPlan);
serializedMultiPlan = CitusNodeToString(multiPlan);
multiPlanData = makeNode(Const);
multiPlanData->consttype = CSTRINGOID;
multiPlanData->constlen = strlen(serializedPlan);
multiPlanData->constvalue = CStringGetDatum(serializedPlan);
multiPlanData->constlen = strlen(serializedMultiPlan);
multiPlanData->constvalue = CStringGetDatum(serializedMultiPlan);
multiPlanData->constbyval = false;
multiPlanData->location = -1;
@ -350,8 +354,12 @@ SerializableMultiPlan(MultiPlan *multiPlan)
}
MultiPlan *
DeSerializeMultiPlan(Node *node)
/*
* DeserializeMultiPlan returns the deserialized distributed plan from the string
* representation in a Const node.
*/
static MultiPlan *
DeserializeMultiPlan(Node *node)
{
Const *multiPlanData = NULL;
char *serializedMultiPlan = NULL;
@ -369,107 +377,171 @@ DeSerializeMultiPlan(Node *node)
/*
* CreateCitusToplevelNode creates the top-level planTree node for a
* distributed statement. That top-level node is a) recognizable by the
* executor hooks, allowing them to redirect execution, b) contains the
* parameters required for distributed execution.
*
* The exact representation of the top-level node is an implementation detail
* which should not be referred to outside this file, as it's likely to become
* version dependant. Use GetMultiPlan() and HasCitusToplevelNode() to access.
*
* FIXME
*
* Internally the data is stored as arguments to a 'citus_extradata_container'
* function, which has to be removed from the really executed plan tree before
* query execution.
* FinalizePlan combines local plan with distributed plan and creates a plan
* which can be run by the PostgreSQL executor.
*/
PlannedStmt *
MultiQueryContainerNode(PlannedStmt *originalPlan, MultiPlan *multiPlan)
static PlannedStmt *
FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan)
{
PlannedStmt *resultPlan = NULL;
PlannedStmt *finalPlan = NULL;
CustomScan *customScan = makeNode(CustomScan);
Node *multiPlanData = SerializableMultiPlan(multiPlan);
Node *multiPlanData = NULL;
MultiExecutorType executorType = MULTI_EXECUTOR_INVALID_FIRST;
customScan->methods = &CitusCustomScanMethods;
customScan->custom_private = list_make1(multiPlanData);
/* FIXME: This probably ain't correct */
if (ExecSupportsBackwardScan(originalPlan->planTree))
if (!multiPlan->planningError)
{
customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN;
executorType = JobExecutorType(multiPlan);
}
/*
* FIXME: these two branches/pieces of code should probably be moved into
* router / logical planner code respectively.
*/
switch (executorType)
{
case MULTI_EXECUTOR_REAL_TIME:
{
customScan->methods = &RealTimeCustomScanMethods;
break;
}
case MULTI_EXECUTOR_TASK_TRACKER:
{
customScan->methods = &TaskTrackerCustomScanMethods;
break;
}
case MULTI_EXECUTOR_ROUTER:
{
customScan->methods = &RouterCustomScanMethods;
break;
}
default:
{
customScan->methods = &DelayedErrorCustomScanMethods;
break;
}
}
multiPlanData = SerializeMultiPlan(multiPlan);
customScan->custom_private = list_make1(multiPlanData);
customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN;
/* check if we have a master query */
if (multiPlan->masterQuery)
{
resultPlan = MasterNodeSelectPlan(multiPlan, customScan);
resultPlan->queryId = originalPlan->queryId;
resultPlan->utilityStmt = originalPlan->utilityStmt;
finalPlan = FinalizeNonRouterPlan(localPlan, multiPlan, customScan);
}
else
{
ListCell *lc = NULL;
finalPlan = FinalizeRouterPlan(localPlan, customScan);
}
return finalPlan;
}
/*
* FinalizeNonRouterPlan gets the distributed custom scan plan, and creates the
* final master select plan on the top of this distributed plan for real-time
* and task-tracker executors.
*/
static PlannedStmt *
FinalizeNonRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan,
CustomScan *customScan)
{
PlannedStmt *finalPlan = NULL;
finalPlan = MasterNodeSelectPlan(multiPlan, customScan);
finalPlan->queryId = localPlan->queryId;
finalPlan->utilityStmt = localPlan->utilityStmt;
return finalPlan;
}
/*
* FinalizeRouterPlan gets a CustomScan node which already wrapped distributed
* part of a router plan and sets it as the direct child of the router plan
* because we don't run any query on master node for router executable queries.
* Here, we also rebuild the column list to read from the remote scan.
*/
static PlannedStmt *
FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan)
{
PlannedStmt *routerPlan = NULL;
RangeTblEntry *remoteScanRangeTableEntry = NULL;
ListCell *targetEntryCell = NULL;
List *targetList = NIL;
bool foundJunk = false;
RangeTblEntry *rangeTableEntry = NULL;
List *columnNames = NIL;
int newRTI = list_length(originalPlan->rtable) + 1;
List *columnNameList = NIL;
/*
* XXX: This basically just builds a targetlist to "read" from the
* custom scan output.
*/
foreach(lc, originalPlan->planTree->targetlist)
/* we will have only one range table entry */
int customScanRangeTableIndex = 1;
/* build a targetlist to read from the custom scan output */
foreach(targetEntryCell, localPlan->planTree->targetlist)
{
TargetEntry *te = lfirst(lc);
Var *newVar = NULL;
TargetEntry *targetEntry = lfirst(targetEntryCell);
TargetEntry *newTargetEntry = NULL;
Var *newVar = NULL;
Value *columnName = NULL;
Assert(IsA(te, TargetEntry));
Assert(IsA(targetEntry, TargetEntry));
/*
* XXX: I can't think of a case where we'd need resjunk stuff at
* the toplevel of a router query - all things needing it have
* been pushed down.
* This is unlikely to be hit because we would not need resjunk stuff
* at the toplevel of a router query - all things needing it have been
* pushed down.
*/
if (te->resjunk)
if (targetEntry->resjunk)
{
foundJunk = true;
continue;
}
if (foundJunk)
{
ereport(ERROR, (errmsg("unexpected !junk entry after resjunk entry")));
}
/* build TE pointing to custom scan */
newVar = makeVarFromTargetEntry(newRTI, te);
newTargetEntry = flatCopyTargetEntry(te);
/* build target entry pointing to remote scan range table entry */
newVar = makeVarFromTargetEntry(customScanRangeTableIndex, targetEntry);
newTargetEntry = flatCopyTargetEntry(targetEntry);
newTargetEntry->expr = (Expr *) newVar;
targetList = lappend(targetList, newTargetEntry);
columnNames = lappend(columnNames, makeString(te->resname));
columnName = makeString(targetEntry->resname);
columnNameList = lappend(columnNameList, columnName);
}
/* XXX: can't think of a better RTE type than VALUES */
rangeTableEntry = makeNode(RangeTblEntry);
rangeTableEntry->rtekind = RTE_VALUES; /* can't look up relation */
rangeTableEntry->eref = makeAlias("remote_scan", columnNames);
rangeTableEntry->inh = false;
rangeTableEntry->inFromCl = true;
resultPlan = originalPlan;
resultPlan->planTree = (Plan *) customScan;
resultPlan->rtable = lappend(resultPlan->rtable, rangeTableEntry);
customScan->scan.plan.targetlist = targetList;
}
return resultPlan;
routerPlan = makeNode(PlannedStmt);
routerPlan->planTree = (Plan *) customScan;
remoteScanRangeTableEntry = RemoteScanRangeTableEntry(columnNameList);
routerPlan->rtable = list_make1(remoteScanRangeTableEntry);
routerPlan->canSetTag = true;
routerPlan->relationOids = NIL;
routerPlan->queryId = localPlan->queryId;
routerPlan->utilityStmt = localPlan->utilityStmt;
routerPlan->commandType = localPlan->commandType;
routerPlan->hasReturning = localPlan->hasReturning;
return routerPlan;
}
/*
* RemoteScanRangeTableEntry creates a range table entry from given column name
* list to represent a remote scan.
*/
RangeTblEntry *
RemoteScanRangeTableEntry(List *columnNameList)
{
RangeTblEntry *remoteScanRangeTableEntry = makeNode(RangeTblEntry);
/* we use RTE_VALUES for custom scan because we can't look up relation */
remoteScanRangeTableEntry->rtekind = RTE_VALUES;
remoteScanRangeTableEntry->eref = makeAlias("remote_scan", columnNameList);
remoteScanRangeTableEntry->inh = false;
remoteScanRangeTableEntry->inFromCl = true;
return remoteScanRangeTableEntry;
}

View File

@ -236,9 +236,13 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
multiPlan->workerJob = job;
multiPlan->masterQuery = NULL;
multiPlan->masterTableName = NULL;
multiPlan->routerExecutable = true;
multiPlan->hasReturning = list_length(originalQuery->returningList) > 0;
multiPlan->hasReturning = false;
if (list_length(originalQuery->returningList) > 0)
{
multiPlan->hasReturning = true;
}
return multiPlan;
}
@ -321,10 +325,14 @@ CreateInsertSelectRouterPlan(Query *originalQuery,
/* and finally the multi plan */
multiPlan->workerJob = workerJob;
multiPlan->masterTableName = NULL;
multiPlan->masterQuery = NULL;
multiPlan->routerExecutable = true;
multiPlan->hasReturning = list_length(originalQuery->returningList) > 0;
multiPlan->hasReturning = false;
if (list_length(originalQuery->returningList) > 0)
{
multiPlan->hasReturning = true;
}
return multiPlan;
}

View File

@ -114,9 +114,7 @@ _PG_init(void)
* (thus as the innermost/last running hook) to be able to do our
* duties. For simplicity insist that all hooks are previously unused.
*/
if (planner_hook != NULL ||
ExplainOneQuery_hook != NULL ||
ProcessUtility_hook != NULL)
if (planner_hook != NULL || ProcessUtility_hook != NULL)
{
ereport(ERROR, (errmsg("Citus has to be loaded first"),
errhint("Place citus at the beginning of "

View File

@ -281,7 +281,6 @@ OutMultiPlan(OUTFUNC_ARGS)
WRITE_NODE_FIELD(workerJob);
WRITE_NODE_FIELD(masterQuery);
WRITE_STRING_FIELD(masterTableName);
WRITE_BOOL_FIELD(routerExecutable);
WRITE_NODE_FIELD(planningError);
}

View File

@ -188,7 +188,6 @@ ReadMultiPlan(READFUNC_ARGS)
READ_NODE_FIELD(workerJob);
READ_NODE_FIELD(masterQuery);
READ_STRING_FIELD(masterTableName);
READ_BOOL_FIELD(routerExecutable);
READ_NODE_FIELD(planningError);

View File

@ -17,9 +17,6 @@
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_server_executor.h"
/* signal currently executed statement is a master select statement or router execution */
#define EXEC_FLAG_CITUS_MASTER_SELECT 0x100
#define EXEC_FLAG_CITUS_ROUTER_EXECUTOR 0x200
#if (PG_VERSION_NUM >= 90600)
#define tuplecount_t uint64
@ -30,23 +27,26 @@
typedef struct CitusScanState
{
CustomScanState customScanState;
MultiPlan *multiPlan;
MultiExecutorType executorType;
/* state for router */
bool finishedUnderlyingScan;
Tuplestorestate *tuplestorestate;
CustomScanState customScanState; /* underlying custom scan node */
MultiPlan *multiPlan; /* distributed execution plan */
MultiExecutorType executorType; /* distributed executor type */
bool finishedRemoteScan; /* flag to check if remote scan is finished */
Tuplestorestate *tuplestorestate; /* tuple store to store distributed results */
} CitusScanState;
Node * CitusCreateScan(CustomScan *scan);
extern void CitusBeginScan(CustomScanState *node,
EState *estate,
int eflags);
extern TupleTableSlot * CitusExecScan(CustomScanState *node);
extern Node * RealTimeCreateScan(CustomScan *scan);
extern Node * TaskTrackerCreateScan(CustomScan *scan);
extern Node * RouterCreateScan(CustomScan *scan);
extern Node * DelayedErrorCreateScan(CustomScan *scan);
extern void CitusSelectBeginScan(CustomScanState *node, EState *estate, int eflags);
extern TupleTableSlot * RealTimeExecScan(CustomScanState *node);
extern TupleTableSlot * TaskTrackerExecScan(CustomScanState *node);
extern void CitusEndScan(CustomScanState *node);
extern void CitusReScan(CustomScanState *node);
extern void CitusExplainScan(CustomScanState *node, List *ancestors,
struct ExplainState *es);
extern void CitusExplainScan(CustomScanState *node, List *ancestors, struct
ExplainState *es);
extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState);
#endif /* MULTI_EXECUTOR_H */

View File

@ -24,6 +24,6 @@ struct MultiPlan;
struct CustomScan;
extern PlannedStmt * MasterNodeSelectPlan(struct MultiPlan *multiPlan,
struct CustomScan *dataScan);
extern List * MasterTargetList(List *workerTargetList);
#endif /* MULTI_MASTER_PLANNER_H */

View File

@ -213,13 +213,11 @@ typedef struct JoinSequenceNode
typedef struct MultiPlan
{
CitusNode type;
CmdType operation;
bool hasReturning;
bool hasReturning;
Job *workerJob;
Query *masterQuery;
char *masterTableName;
bool routerExecutable;
/*

View File

@ -51,14 +51,13 @@ typedef struct RelationShard
extern PlannedStmt * multi_planner(Query *parse, int cursorOptions,
ParamListInfo boundParams);
extern bool HasCitusToplevelNode(PlannedStmt *planStatement);
struct MultiPlan;
extern struct MultiPlan * GetMultiPlan(CustomScan *node);
extern Node * SerializableMultiPlan(struct MultiPlan *multiPlan);
extern struct MultiPlan * DeSerializeMultiPlan(Node *node);
extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
Index index, RangeTblEntry *rte);
extern bool IsModifyCommand(Query *query);
extern void VerifyMultiPlanValidity(struct MultiPlan *multiPlan);
extern bool IsModifyMultiPlan(struct MultiPlan *multiPlan);
extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList);
#endif /* MULTI_PLANNER_H */

View File

@ -35,15 +35,12 @@ typedef struct XactShardConnSet
extern bool AllModificationsCommutative;
extern bool EnableDeadlockPrevention;
extern void RouterBeginScan(CitusScanState *scanState);
extern TupleTableSlot * RouterExecScan(CitusScanState *scanState);
extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, List *taskList);
extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count);
extern void RouterExecutorFinish(QueryDesc *queryDesc);
extern void RouterExecutorEnd(QueryDesc *queryDesc);
extern void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags);
extern TupleTableSlot * RouterSingleModifyExecScan(CustomScanState *node);
extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node);
extern TupleTableSlot * RouterMultiModifyExecScan(CustomScanState *node);
extern int64 ExecuteModifyTasksWithoutResults(List *taskList);
#endif /* MULTI_ROUTER_EXECUTOR_H_ */

View File

@ -43,8 +43,7 @@ Sort
Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), l_quantity
-> HashAggregate
Group Key: l_quantity
-> Custom Scan (CitusScan)
Executor: Real-Time
-> Custom Scan (Citus Real-Time)
Task Count: 8
Tasks Shown: One of 8
-> Task
@ -74,10 +73,9 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
{
"Node Type": "Custom Scan",
"Parent Relationship": "Outer",
"Custom Plan Provider": "CitusScan",
"Custom Plan Provider": "Citus Real-Time",
"Parallel Aware": false,
"Distributed Query": {
"Executor": "Real-Time",
"Job": {
"Task Count": 8,
"Tasks Shown": "One of 8",
@ -150,10 +148,9 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Plan>
<Node-Type>Custom Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Custom-Plan-Provider>CitusScan</Custom-Plan-Provider>
<Custom-Plan-Provider>Citus Real-Time</Custom-Plan-Provider>
<Parallel-Aware>false</Parallel-Aware>
<Distributed-Query>
<Executor>Real-Time</Executor>
<Job>
<Task-Count>8</Task-Count>
<Tasks-Shown>One of 8</Tasks-Shown>
@ -221,10 +218,9 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
Plans:
- Node Type: "Custom Scan"
Parent Relationship: "Outer"
Custom Plan Provider: "CitusScan"
Custom Plan Provider: "Citus Real-Time"
Parallel Aware: false
Distributed Query:
Executor: "Real-Time"
Job:
Task Count: 8
Tasks Shown: "One of 8"
@ -253,8 +249,7 @@ Sort
Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), l_quantity
-> HashAggregate
Group Key: l_quantity
-> Custom Scan (CitusScan)
Executor: Real-Time
-> Custom Scan (Citus Real-Time)
Task Count: 8
Tasks Shown: One of 8
-> Task
@ -267,9 +262,8 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE)
SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem;
Aggregate
Output: (sum("?column?") / (sum("?column?_1") / pg_catalog.sum("?column?_2")))
-> Custom Scan (CitusScan)
-> Custom Scan (Citus Real-Time)
Output: "?column?", "?column?_1", "?column?_2"
Executor: Real-Time
Task Count: 8
Tasks Shown: One of 8
-> Task
@ -286,8 +280,7 @@ EXPLAIN (COSTS FALSE)
Limit
-> Sort
Sort Key: l_quantity
-> Custom Scan (CitusScan)
Executor: Real-Time
-> Custom Scan (Citus Real-Time)
Task Count: 8
Tasks Shown: One of 8
-> Task
@ -304,8 +297,7 @@ Limit
-- Test insert
EXPLAIN (COSTS FALSE)
INSERT INTO lineitem VALUES(1,0);
Custom Scan (CitusScan)
Executor: Router
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
@ -317,8 +309,7 @@ EXPLAIN (COSTS FALSE)
UPDATE lineitem
SET l_suppkey = 12
WHERE l_orderkey = 1 AND l_partkey = 0;
Custom Scan (CitusScan)
Executor: Router
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
@ -333,8 +324,7 @@ Custom Scan (CitusScan)
EXPLAIN (COSTS FALSE)
DELETE FROM lineitem
WHERE l_orderkey = 1 AND l_partkey = 0;
Custom Scan (CitusScan)
Executor: Router
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
@ -348,8 +338,7 @@ Custom Scan (CitusScan)
-- Test single-shard SELECT
EXPLAIN (COSTS FALSE)
SELECT l_quantity FROM lineitem WHERE l_orderkey = 5;
Custom Scan (CitusScan)
Executor: Router
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
@ -368,8 +357,7 @@ t
EXPLAIN (COSTS FALSE)
CREATE TABLE explain_result AS
SELECT * FROM lineitem;
Custom Scan (CitusScan)
Executor: Real-Time
Custom Scan (Citus Real-Time)
Task Count: 8
Tasks Shown: One of 8
-> Task
@ -381,10 +369,9 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE)
HAVING sum(l_quantity) > 100;
Aggregate
Output: (sum("?column?") / (sum("?column?_1") / pg_catalog.sum("?column?_2")))
Filter: (sum("remote scan".worker_column_4) > '100'::numeric)
-> Custom Scan (CitusScan)
Filter: (sum(remote_scan.worker_column_4) > '100'::numeric)
-> Custom Scan (Citus Real-Time)
Output: "?column?", "?column?_1", "?column?_2", worker_column_4
Executor: Real-Time
Task Count: 8
Tasks Shown: One of 8
-> Task
@ -400,11 +387,10 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE)
HAVING l_quantity > (100 * random());
HashAggregate
Output: l_quantity
Group Key: "remote scan".l_quantity
Filter: (("remote scan".worker_column_2)::double precision > ('100'::double precision * random()))
-> Custom Scan (CitusScan)
Group Key: remote_scan.l_quantity
Filter: ((remote_scan.worker_column_2)::double precision > ('100'::double precision * random()))
-> Custom Scan (Citus Real-Time)
Output: l_quantity, worker_column_2
Executor: Real-Time
Task Count: 8
Tasks Shown: One of 8
-> Task
@ -419,8 +405,7 @@ SET citus.explain_all_tasks TO on;
EXPLAIN (COSTS FALSE)
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
Aggregate
-> Custom Scan (CitusScan)
Executor: Real-Time
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: All
-> Task
@ -455,8 +440,7 @@ SET citus.explain_all_tasks TO off;
EXPLAIN (COSTS FALSE)
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
Aggregate
-> Custom Scan (CitusScan)
Executor: Task-Tracker
-> Custom Scan (Citus Task-Tracker)
Task Count: 4
Tasks Shown: One of 4
-> Task
@ -473,8 +457,7 @@ EXPLAIN (COSTS FALSE)
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
Aggregate
-> Custom Scan (CitusScan)
Executor: Task-Tracker
-> Custom Scan (Citus Task-Tracker)
Task Count: 1
Tasks Shown: None, not supported for re-partition queries
-> MapMergeJob
@ -500,10 +483,9 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
{
"Node Type": "Custom Scan",
"Parent Relationship": "Outer",
"Custom Plan Provider": "CitusScan",
"Custom Plan Provider": "Citus Task-Tracker",
"Parallel Aware": false,
"Distributed Query": {
"Executor": "Task-Tracker",
"Job": {
"Task Count": 1,
"Tasks Shown": "None, not supported for re-partition queries",
@ -550,10 +532,9 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Plan>
<Node-Type>Custom Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Custom-Plan-Provider>CitusScan</Custom-Plan-Provider>
<Custom-Plan-Provider>Citus Task-Tracker</Custom-Plan-Provider>
<Parallel-Aware>false</Parallel-Aware>
<Distributed-Query>
<Executor>Task-Tracker</Executor>
<Job>
<Task-Count>1</Task-Count>
<Tasks-Shown>None, not supported for re-partition queries</Tasks-Shown>
@ -610,10 +591,9 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
Plans:
- Node Type: "Custom Scan"
Parent Relationship: "Outer"
Custom Plan Provider: "CitusScan"
Custom Plan Provider: "Citus Task-Tracker"
Parallel Aware: false
Distributed Query:
Executor: "Task-Tracker"
Job:
Task Count: 1
Tasks Shown: "None, not supported for re-partition queries"
@ -639,8 +619,7 @@ Finalize Aggregate
-- ensure distributed plans don't break
EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem;
Aggregate
-> Custom Scan (CitusScan)
Executor: Task-Tracker
-> Custom Scan (Citus Task-Tracker)
Task Count: 8
Tasks Shown: One of 8
-> Task
@ -652,8 +631,7 @@ PREPARE task_tracker_query AS
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
EXPLAIN (COSTS FALSE) EXECUTE task_tracker_query;
Aggregate
-> Custom Scan (CitusScan)
Executor: Task-Tracker
-> Custom Scan (Citus Task-Tracker)
Task Count: 4
Tasks Shown: One of 4
-> Task
@ -664,8 +642,7 @@ Aggregate
SET citus.task_executor_type TO 'real-time';
PREPARE router_executor_query AS SELECT l_quantity FROM lineitem WHERE l_orderkey = 5;
EXPLAIN EXECUTE router_executor_query;
Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
Executor: Router
Custom Scan (Citus Router) (cost=0.00..0.00 rows=0 width=0)
Task Count: 1
Tasks Shown: All
-> Task
@ -678,8 +655,7 @@ PREPARE real_time_executor_query AS
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
EXPLAIN (COSTS FALSE) EXECUTE real_time_executor_query;
Aggregate
-> Custom Scan (CitusScan)
Executor: Real-Time
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: One of 4
-> Task
@ -691,8 +667,7 @@ Aggregate
-- at least make sure to fail without crashing
PREPARE router_executor_query_param(int) AS SELECT l_quantity FROM lineitem WHERE l_orderkey = $1;
EXPLAIN EXECUTE router_executor_query_param(5);
Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
Executor: Router
Custom Scan (Citus Router) (cost=0.00..0.00 rows=0 width=0)
Task Count: 1
Tasks Shown: All
-> Task

View File

@ -43,8 +43,7 @@ Sort
Sort Key: COALESCE((sum((COALESCE((sum(count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), l_quantity
-> HashAggregate
Group Key: l_quantity
-> Custom Scan (CitusScan)
Executor: Real-Time
-> Custom Scan (Citus Real-Time)
Task Count: 8
Tasks Shown: One of 8
-> Task
@ -71,9 +70,8 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
{
"Node Type": "Custom Scan",
"Parent Relationship": "Outer",
"Custom Plan Provider": "CitusScan",
"Custom Plan Provider": "Citus Real-Time",
"Distributed Query": {
"Executor": "Real-Time",
"Job": {
"Task Count": 8,
"Tasks Shown": "One of 8",
@ -140,9 +138,8 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Plan>
<Node-Type>Custom Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Custom-Plan-Provider>CitusScan</Custom-Plan-Provider>
<Custom-Plan-Provider>Citus Real-Time</Custom-Plan-Provider>
<Distributed-Query>
<Executor>Real-Time</Executor>
<Job>
<Task-Count>8</Task-Count>
<Tasks-Shown>One of 8</Tasks-Shown>
@ -204,9 +201,8 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
Plans:
- Node Type: "Custom Scan"
Parent Relationship: "Outer"
Custom Plan Provider: "CitusScan"
Custom Plan Provider: "Citus Real-Time"
Distributed Query:
Executor: "Real-Time"
Job:
Task Count: 8
Tasks Shown: "One of 8"
@ -232,8 +228,7 @@ Sort
Sort Key: COALESCE((sum((COALESCE((sum(count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), l_quantity
-> HashAggregate
Group Key: l_quantity
-> Custom Scan (CitusScan)
Executor: Real-Time
-> Custom Scan (Citus Real-Time)
Task Count: 8
Tasks Shown: One of 8
-> Task
@ -246,9 +241,8 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE)
SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem;
Aggregate
Output: (sum("?column?") / (sum("?column?_1") / sum("?column?_2")))
-> Custom Scan (CitusScan)
-> Custom Scan (Citus Real-Time)
Output: "?column?", "?column?_1", "?column?_2"
Executor: Real-Time
Task Count: 8
Tasks Shown: One of 8
-> Task
@ -265,8 +259,7 @@ EXPLAIN (COSTS FALSE)
Limit
-> Sort
Sort Key: l_quantity
-> Custom Scan (CitusScan)
Executor: Real-Time
-> Custom Scan (Citus Real-Time)
Task Count: 8
Tasks Shown: One of 8
-> Task
@ -283,8 +276,7 @@ Limit
-- Test insert
EXPLAIN (COSTS FALSE)
INSERT INTO lineitem VALUES(1,0);
Custom Scan (CitusScan)
Executor: Router
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
@ -296,8 +288,7 @@ EXPLAIN (COSTS FALSE)
UPDATE lineitem
SET l_suppkey = 12
WHERE l_orderkey = 1 AND l_partkey = 0;
Custom Scan (CitusScan)
Executor: Router
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
@ -312,8 +303,7 @@ Custom Scan (CitusScan)
EXPLAIN (COSTS FALSE)
DELETE FROM lineitem
WHERE l_orderkey = 1 AND l_partkey = 0;
Custom Scan (CitusScan)
Executor: Router
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
@ -327,8 +317,7 @@ Custom Scan (CitusScan)
-- Test single-shard SELECT
EXPLAIN (COSTS FALSE)
SELECT l_quantity FROM lineitem WHERE l_orderkey = 5;
Custom Scan (CitusScan)
Executor: Router
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
@ -347,8 +336,7 @@ t
EXPLAIN (COSTS FALSE)
CREATE TABLE explain_result AS
SELECT * FROM lineitem;
Custom Scan (CitusScan)
Executor: Real-Time
Custom Scan (Citus Real-Time)
Task Count: 8
Tasks Shown: One of 8
-> Task
@ -360,10 +348,9 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE)
HAVING sum(l_quantity) > 100;
Aggregate
Output: (sum("?column?") / (sum("?column?_1") / sum("?column?_2")))
Filter: (sum("remote scan".worker_column_4) > '100'::numeric)
-> Custom Scan (CitusScan)
Filter: (sum(remote_scan.worker_column_4) > '100'::numeric)
-> Custom Scan (Citus Real-Time)
Output: "?column?", "?column?_1", "?column?_2", worker_column_4
Executor: Real-Time
Task Count: 8
Tasks Shown: One of 8
-> Task
@ -379,11 +366,10 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE)
HAVING l_quantity > (100 * random());
HashAggregate
Output: l_quantity
Group Key: "remote scan".l_quantity
Filter: (("remote scan".worker_column_2)::double precision > ('100'::double precision * random()))
-> Custom Scan (CitusScan)
Group Key: remote_scan.l_quantity
Filter: ((remote_scan.worker_column_2)::double precision > ('100'::double precision * random()))
-> Custom Scan (Citus Real-Time)
Output: l_quantity, worker_column_2
Executor: Real-Time
Task Count: 8
Tasks Shown: One of 8
-> Task
@ -398,8 +384,7 @@ SET citus.explain_all_tasks TO on;
EXPLAIN (COSTS FALSE)
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
Aggregate
-> Custom Scan (CitusScan)
Executor: Real-Time
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: All
-> Task
@ -434,8 +419,7 @@ SET citus.explain_all_tasks TO off;
EXPLAIN (COSTS FALSE)
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
Aggregate
-> Custom Scan (CitusScan)
Executor: Task-Tracker
-> Custom Scan (Citus Task-Tracker)
Task Count: 4
Tasks Shown: One of 4
-> Task
@ -452,8 +436,7 @@ EXPLAIN (COSTS FALSE)
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
Aggregate
-> Custom Scan (CitusScan)
Executor: Task-Tracker
-> Custom Scan (Citus Task-Tracker)
Task Count: 1
Tasks Shown: None, not supported for re-partition queries
-> MapMergeJob
@ -477,9 +460,8 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
{
"Node Type": "Custom Scan",
"Parent Relationship": "Outer",
"Custom Plan Provider": "CitusScan",
"Custom Plan Provider": "Citus Task-Tracker",
"Distributed Query": {
"Executor": "Task-Tracker",
"Job": {
"Task Count": 1,
"Tasks Shown": "None, not supported for re-partition queries",
@ -524,9 +506,8 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Plan>
<Node-Type>Custom Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Custom-Plan-Provider>CitusScan</Custom-Plan-Provider>
<Custom-Plan-Provider>Citus Task-Tracker</Custom-Plan-Provider>
<Distributed-Query>
<Executor>Task-Tracker</Executor>
<Job>
<Task-Count>1</Task-Count>
<Tasks-Shown>None, not supported for re-partition queries</Tasks-Shown>
@ -581,9 +562,8 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
Plans:
- Node Type: "Custom Scan"
Parent Relationship: "Outer"
Custom Plan Provider: "CitusScan"
Custom Plan Provider: "Citus Task-Tracker"
Distributed Query:
Executor: "Task-Tracker"
Job:
Task Count: 1
Tasks Shown: "None, not supported for re-partition queries"
@ -610,8 +590,7 @@ Aggregate
-- ensure distributed plans don't break
EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem;
Aggregate
-> Custom Scan (CitusScan)
Executor: Task-Tracker
-> Custom Scan (Citus Task-Tracker)
Task Count: 8
Tasks Shown: One of 8
-> Task
@ -623,8 +602,7 @@ PREPARE task_tracker_query AS
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
EXPLAIN (COSTS FALSE) EXECUTE task_tracker_query;
Aggregate
-> Custom Scan (CitusScan)
Executor: Task-Tracker
-> Custom Scan (Citus Task-Tracker)
Task Count: 4
Tasks Shown: One of 4
-> Task
@ -635,8 +613,7 @@ Aggregate
SET citus.task_executor_type TO 'real-time';
PREPARE router_executor_query AS SELECT l_quantity FROM lineitem WHERE l_orderkey = 5;
EXPLAIN EXECUTE router_executor_query;
Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
Executor: Router
Custom Scan (Citus Router) (cost=0.00..0.00 rows=0 width=0)
Task Count: 1
Tasks Shown: All
-> Task
@ -649,8 +626,7 @@ PREPARE real_time_executor_query AS
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
EXPLAIN (COSTS FALSE) EXECUTE real_time_executor_query;
Aggregate
-> Custom Scan (CitusScan)
Executor: Real-Time
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: One of 4
-> Task
@ -662,8 +638,7 @@ Aggregate
-- at least make sure to fail without crashing
PREPARE router_executor_query_param(int) AS SELECT l_quantity FROM lineitem WHERE l_orderkey = $1;
EXPLAIN EXECUTE router_executor_query_param(5);
Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
Executor: Router
Custom Scan (Citus Router) (cost=0.00..0.00 rows=0 width=0)
Task Count: 1
Tasks Shown: All
-> Task

View File

@ -6,7 +6,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 650000;
-- Set configuration to print table join order and pruned shards
SET citus.explain_distributed_queries TO off;
SET citus.log_multi_join_order TO TRUE;
SET citus.task_executor_type = 'task-tracker'; -- can't explain all queries otherwhise
SET citus.task_executor_type = 'task-tracker'; -- can't explain all queries otherwise
SET client_min_messages TO DEBUG2;
-- Create new table definitions for use in testing in distributed planning and
-- execution functionality. Also create indexes to boost performance.
@ -141,8 +141,8 @@ DEBUG: join prunable for intervals [13473,14947] and [4480,5986]
DEBUG: join prunable for intervals [13473,14947] and [8997,10560]
DEBUG: join prunable for intervals [13473,14947] and [10560,12036]
QUERY PLAN
--------------------------------------------------------------
Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
--------------------------------------------------------------------
Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(2 rows)
@ -157,9 +157,9 @@ EXPLAIN SELECT count(*) FROM lineitem, orders
OR (l_orderkey = o_orderkey AND l_quantity < 10);
LOG: join order: [ "lineitem" ][ local partition join "orders" ]
QUERY PLAN
--------------------------------------------------------------------
--------------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
@ -178,9 +178,9 @@ EXPLAIN SELECT count(*) FROM orders, lineitem_hash
WHERE o_orderkey = l_orderkey;
LOG: join order: [ "orders" ][ single partition join "lineitem_hash" ]
QUERY PLAN
--------------------------------------------------------------------
--------------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
@ -189,9 +189,9 @@ EXPLAIN SELECT count(*) FROM orders_hash, lineitem_hash
WHERE o_orderkey = l_orderkey;
LOG: join order: [ "orders_hash" ][ local partition join "lineitem_hash" ]
QUERY PLAN
--------------------------------------------------------------------
--------------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
@ -200,9 +200,9 @@ EXPLAIN SELECT count(*) FROM customer_hash, nation
WHERE c_nationkey = n_nationkey;
LOG: join order: [ "customer_hash" ][ broadcast join "nation" ]
QUERY PLAN
--------------------------------------------------------------------
--------------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
@ -214,9 +214,9 @@ EXPLAIN SELECT count(*) FROM orders, lineitem, customer
WHERE o_custkey = l_partkey AND o_custkey = c_nationkey;
LOG: join order: [ "orders" ][ dual partition join "lineitem" ][ dual partition join "customer" ]
QUERY PLAN
--------------------------------------------------------------------
--------------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
@ -226,9 +226,9 @@ EXPLAIN SELECT count(*) FROM orders, customer_hash
WHERE c_custkey = o_custkey;
LOG: join order: [ "orders" ][ dual partition join "customer_hash" ]
QUERY PLAN
--------------------------------------------------------------------
--------------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
@ -238,9 +238,9 @@ EXPLAIN SELECT count(*) FROM orders_hash, customer
WHERE c_custkey = o_custkey;
LOG: join order: [ "orders_hash" ][ single partition join "customer" ]
QUERY PLAN
--------------------------------------------------------------------
--------------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)

View File

@ -6,7 +6,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 660000;
-- Enable configuration to print table join order
SET citus.explain_distributed_queries TO off;
SET citus.log_multi_join_order TO TRUE;
SET citus.task_executor_type = 'task-tracker'; -- can't explain all queries otherwhise
SET citus.task_executor_type = 'task-tracker'; -- can't explain all queries otherwise
SET client_min_messages TO LOG;
-- Change configuration to treat lineitem, orders, customer, and part tables as
-- large. The following queries are basically the same as the ones in tpch_small
@ -25,9 +25,9 @@ WHERE
and l_quantity < 24;
LOG: join order: [ "lineitem" ]
QUERY PLAN
--------------------------------------------------------------------
--------------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
@ -56,12 +56,12 @@ ORDER BY
o_orderdate;
LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single partition join "customer" ]
QUERY PLAN
--------------------------------------------------------------------------
--------------------------------------------------------------------------------
Sort (cost=0.00..0.00 rows=0 width=0)
Sort Key: sum((sum(revenue))) DESC, o_orderdate
-> HashAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: l_orderkey, o_orderdate, o_shippriority
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(6 rows)
@ -104,7 +104,7 @@ LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single partit
Sort Key: sum((sum(revenue))) DESC
-> HashAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: c_custkey, c_name, c_acctbal, c_phone, n_name, c_address, c_comment
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(6 rows)
@ -140,9 +140,9 @@ WHERE
);
LOG: join order: [ "lineitem" ][ single partition join "part" ]
QUERY PLAN
--------------------------------------------------------------------
--------------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
@ -159,10 +159,10 @@ GROUP BY
l_partkey;
LOG: join order: [ "lineitem" ][ local partition join "orders" ][ single partition join "part" ][ single partition join "customer" ]
QUERY PLAN
--------------------------------------------------------------------
--------------------------------------------------------------------------
HashAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: l_partkey
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(4 rows)

View File

@ -19,9 +19,9 @@ WHERE
and l_quantity < 24;
LOG: join order: [ "lineitem" ]
QUERY PLAN
--------------------------------------------------------------------
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
@ -50,12 +50,12 @@ ORDER BY
o_orderdate;
LOG: join order: [ "orders" ][ broadcast join "customer" ][ local partition join "lineitem" ]
QUERY PLAN
--------------------------------------------------------------------------
-----------------------------------------------------------------------------
Sort (cost=0.00..0.00 rows=0 width=0)
Sort Key: sum((sum(revenue))) DESC, o_orderdate
-> HashAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: l_orderkey, o_orderdate, o_shippriority
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(6 rows)
@ -98,7 +98,7 @@ LOG: join order: [ "orders" ][ broadcast join "customer" ][ broadcast join "nat
Sort Key: sum((sum(revenue))) DESC
-> HashAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: c_custkey, c_name, c_acctbal, c_phone, n_name, c_address, c_comment
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(6 rows)
@ -134,9 +134,9 @@ WHERE
);
LOG: join order: [ "lineitem" ][ broadcast join "part" ]
QUERY PLAN
--------------------------------------------------------------------
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)

View File

@ -101,9 +101,9 @@ EXPLAIN SELECT count(*)
DEBUG: join prunable for intervals [{},{AZZXSP27F21T6,AZZXSP27F21T6}] and [{BA1000U2AMO4ZGX,BZZXSP27F21T6},{CA1000U2AMO4ZGX,CZZXSP27F21T6}]
DEBUG: join prunable for intervals [{BA1000U2AMO4ZGX,BZZXSP27F21T6},{CA1000U2AMO4ZGX,CZZXSP27F21T6}] and [{},{AZZXSP27F21T6,AZZXSP27F21T6}]
QUERY PLAN
--------------------------------------------------------------------
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
@ -113,9 +113,9 @@ EXPLAIN SELECT count(*)
DEBUG: join prunable for intervals [(a,3,b),(b,4,c)] and [(c,5,d),(d,6,e)]
DEBUG: join prunable for intervals [(c,5,d),(d,6,e)] and [(a,3,b),(b,4,c)]
QUERY PLAN
--------------------------------------------------------------------
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
@ -126,9 +126,9 @@ EXPLAIN SELECT count(*)
DEBUG: join prunable for intervals [AA1000U2AMO4ZGX,AZZXSP27F21T6] and [BA1000U2AMO4ZGX,BZZXSP27F21T6]
DEBUG: join prunable for intervals [BA1000U2AMO4ZGX,BZZXSP27F21T6] and [AA1000U2AMO4ZGX,AZZXSP27F21T6]
QUERY PLAN
--------------------------------------------------------------------
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)

View File

@ -65,8 +65,7 @@ Sort
Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), l_quantity
-> HashAggregate
Group Key: l_quantity
-> Custom Scan (CitusScan)
Executor: Real-Time
-> Custom Scan (Citus Real-Time)
Task Count: 16
Tasks Shown: One of 16
-> Task
@ -96,10 +95,9 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
{
"Node Type": "Custom Scan",
"Parent Relationship": "Outer",
"Custom Plan Provider": "CitusScan",
"Custom Plan Provider": "Citus Real-Time",
"Parallel Aware": false,
"Distributed Query": {
"Executor": "Real-Time",
"Job": {
"Task Count": 16,
"Tasks Shown": "One of 16",
@ -173,10 +171,9 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Plan>
<Node-Type>Custom Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Custom-Plan-Provider>CitusScan</Custom-Plan-Provider>
<Custom-Plan-Provider>Citus Real-Time</Custom-Plan-Provider>
<Parallel-Aware>false</Parallel-Aware>
<Distributed-Query>
<Executor>Real-Time</Executor>
<Job>
<Task-Count>16</Task-Count>
<Tasks-Shown>One of 16</Tasks-Shown>
@ -244,10 +241,9 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
Plans:
- Node Type: "Custom Scan"
Parent Relationship: "Outer"
Custom Plan Provider: "CitusScan"
Custom Plan Provider: "Citus Real-Time"
Parallel Aware: false
Distributed Query:
Executor: "Real-Time"
Job:
Task Count: 16
Tasks Shown: "One of 16"
@ -276,8 +272,7 @@ Sort
Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), l_quantity
-> HashAggregate
Group Key: l_quantity
-> Custom Scan (CitusScan)
Executor: Real-Time
-> Custom Scan (Citus Real-Time)
Task Count: 16
Tasks Shown: One of 16
-> Task
@ -291,9 +286,8 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE)
SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem_mx;
Aggregate
Output: (sum("?column?") / (sum("?column?_1") / pg_catalog.sum("?column?_2")))
-> Custom Scan (CitusScan)
-> Custom Scan (Citus Real-Time)
Output: "?column?", "?column?_1", "?column?_2"
Executor: Real-Time
Task Count: 16
Tasks Shown: One of 16
-> Task
@ -310,8 +304,7 @@ EXPLAIN (COSTS FALSE)
Limit
-> Sort
Sort Key: l_quantity
-> Custom Scan (CitusScan)
Executor: Real-Time
-> Custom Scan (Citus Real-Time)
Task Count: 16
Tasks Shown: One of 16
-> Task
@ -328,8 +321,7 @@ Limit
-- Test insert
EXPLAIN (COSTS FALSE)
INSERT INTO lineitem_mx VALUES(1,0);
Custom Scan (CitusScan)
Executor: Router
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
@ -341,8 +333,7 @@ EXPLAIN (COSTS FALSE)
UPDATE lineitem_mx
SET l_suppkey = 12
WHERE l_orderkey = 1 AND l_partkey = 0;
Custom Scan (CitusScan)
Executor: Router
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
@ -355,8 +346,7 @@ Custom Scan (CitusScan)
EXPLAIN (COSTS FALSE)
DELETE FROM lineitem_mx
WHERE l_orderkey = 1 AND l_partkey = 0;
Custom Scan (CitusScan)
Executor: Router
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
@ -368,8 +358,7 @@ Custom Scan (CitusScan)
-- Test single-shard SELECT
EXPLAIN (COSTS FALSE)
SELECT l_quantity FROM lineitem_mx WHERE l_orderkey = 5;
Custom Scan (CitusScan)
Executor: Router
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
@ -388,8 +377,7 @@ t
EXPLAIN (COSTS FALSE)
CREATE TABLE explain_result AS
SELECT * FROM lineitem_mx;
Custom Scan (CitusScan)
Executor: Real-Time
Custom Scan (Citus Real-Time)
Task Count: 16
Tasks Shown: One of 16
-> Task
@ -400,8 +388,7 @@ SET citus.explain_all_tasks TO on;
EXPLAIN (COSTS FALSE)
SELECT avg(l_linenumber) FROM lineitem_mx WHERE l_orderkey > 9030;
Aggregate
-> Custom Scan (CitusScan)
Executor: Real-Time
-> Custom Scan (Citus Real-Time)
Task Count: 16
Tasks Shown: All
-> Task
@ -496,8 +483,7 @@ SET citus.explain_all_tasks TO off;
EXPLAIN (COSTS FALSE)
SELECT avg(l_linenumber) FROM lineitem_mx WHERE l_orderkey > 9030;
Aggregate
-> Custom Scan (CitusScan)
Executor: Task-Tracker
-> Custom Scan (Citus Task-Tracker)
Task Count: 16
Tasks Shown: One of 16
-> Task
@ -514,8 +500,7 @@ EXPLAIN (COSTS FALSE)
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
Aggregate
-> Custom Scan (CitusScan)
Executor: Task-Tracker
-> Custom Scan (Citus Task-Tracker)
Task Count: 4
Tasks Shown: None, not supported for re-partition queries
-> MapMergeJob
@ -547,10 +532,9 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
{
"Node Type": "Custom Scan",
"Parent Relationship": "Outer",
"Custom Plan Provider": "CitusScan",
"Custom Plan Provider": "Citus Task-Tracker",
"Parallel Aware": false,
"Distributed Query": {
"Executor": "Task-Tracker",
"Job": {
"Task Count": 4,
"Tasks Shown": "None, not supported for re-partition queries",
@ -605,10 +589,9 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Plan>
<Node-Type>Custom Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Custom-Plan-Provider>CitusScan</Custom-Plan-Provider>
<Custom-Plan-Provider>Citus Task-Tracker</Custom-Plan-Provider>
<Parallel-Aware>false</Parallel-Aware>
<Distributed-Query>
<Executor>Task-Tracker</Executor>
<Job>
<Task-Count>4</Task-Count>
<Tasks-Shown>None, not supported for re-partition queries</Tasks-Shown>
@ -660,10 +643,9 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
Plans:
- Node Type: "Custom Scan"
Parent Relationship: "Outer"
Custom Plan Provider: "CitusScan"
Custom Plan Provider: "Citus Task-Tracker"
Parallel Aware: false
Distributed Query:
Executor: "Task-Tracker"
Job:
Task Count: 4
Tasks Shown: "None, not supported for re-partition queries"

View File

@ -65,8 +65,7 @@ Sort
Sort Key: COALESCE((sum((COALESCE((sum(count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), l_quantity
-> HashAggregate
Group Key: l_quantity
-> Custom Scan (CitusScan)
Executor: Real-Time
-> Custom Scan (Citus Real-Time)
Task Count: 16
Tasks Shown: One of 16
-> Task
@ -93,9 +92,8 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
{
"Node Type": "Custom Scan",
"Parent Relationship": "Outer",
"Custom Plan Provider": "CitusScan",
"Custom Plan Provider": "Citus Real-Time",
"Distributed Query": {
"Executor": "Real-Time",
"Job": {
"Task Count": 16,
"Tasks Shown": "One of 16",
@ -163,9 +161,8 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Plan>
<Node-Type>Custom Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Custom-Plan-Provider>CitusScan</Custom-Plan-Provider>
<Custom-Plan-Provider>Citus Real-Time</Custom-Plan-Provider>
<Distributed-Query>
<Executor>Real-Time</Executor>
<Job>
<Task-Count>16</Task-Count>
<Tasks-Shown>One of 16</Tasks-Shown>
@ -227,9 +224,8 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
Plans:
- Node Type: "Custom Scan"
Parent Relationship: "Outer"
Custom Plan Provider: "CitusScan"
Custom Plan Provider: "Citus Real-Time"
Distributed Query:
Executor: "Real-Time"
Job:
Task Count: 16
Tasks Shown: "One of 16"
@ -255,8 +251,7 @@ Sort
Sort Key: COALESCE((sum((COALESCE((sum(count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), l_quantity
-> HashAggregate
Group Key: l_quantity
-> Custom Scan (CitusScan)
Executor: Real-Time
-> Custom Scan (Citus Real-Time)
Task Count: 16
Tasks Shown: One of 16
-> Task
@ -270,9 +265,8 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE)
SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem_mx;
Aggregate
Output: (sum("?column?") / (sum("?column?_1") / sum("?column?_2")))
-> Custom Scan (CitusScan)
-> Custom Scan (Citus Real-Time)
Output: "?column?", "?column?_1", "?column?_2"
Executor: Real-Time
Task Count: 16
Tasks Shown: One of 16
-> Task
@ -289,8 +283,7 @@ EXPLAIN (COSTS FALSE)
Limit
-> Sort
Sort Key: l_quantity
-> Custom Scan (CitusScan)
Executor: Real-Time
-> Custom Scan (Citus Real-Time)
Task Count: 16
Tasks Shown: One of 16
-> Task
@ -307,8 +300,7 @@ Limit
-- Test insert
EXPLAIN (COSTS FALSE)
INSERT INTO lineitem_mx VALUES(1,0);
Custom Scan (CitusScan)
Executor: Router
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
@ -320,8 +312,7 @@ EXPLAIN (COSTS FALSE)
UPDATE lineitem_mx
SET l_suppkey = 12
WHERE l_orderkey = 1 AND l_partkey = 0;
Custom Scan (CitusScan)
Executor: Router
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
@ -334,8 +325,7 @@ Custom Scan (CitusScan)
EXPLAIN (COSTS FALSE)
DELETE FROM lineitem_mx
WHERE l_orderkey = 1 AND l_partkey = 0;
Custom Scan (CitusScan)
Executor: Router
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
@ -347,8 +337,7 @@ Custom Scan (CitusScan)
-- Test single-shard SELECT
EXPLAIN (COSTS FALSE)
SELECT l_quantity FROM lineitem_mx WHERE l_orderkey = 5;
Custom Scan (CitusScan)
Executor: Router
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
@ -367,8 +356,7 @@ t
EXPLAIN (COSTS FALSE)
CREATE TABLE explain_result AS
SELECT * FROM lineitem_mx;
Custom Scan (CitusScan)
Executor: Real-Time
Custom Scan (Citus Real-Time)
Task Count: 16
Tasks Shown: One of 16
-> Task
@ -379,8 +367,7 @@ SET citus.explain_all_tasks TO on;
EXPLAIN (COSTS FALSE)
SELECT avg(l_linenumber) FROM lineitem_mx WHERE l_orderkey > 9030;
Aggregate
-> Custom Scan (CitusScan)
Executor: Real-Time
-> Custom Scan (Citus Real-Time)
Task Count: 16
Tasks Shown: All
-> Task
@ -475,8 +462,7 @@ SET citus.explain_all_tasks TO off;
EXPLAIN (COSTS FALSE)
SELECT avg(l_linenumber) FROM lineitem_mx WHERE l_orderkey > 9030;
Aggregate
-> Custom Scan (CitusScan)
Executor: Task-Tracker
-> Custom Scan (Citus Task-Tracker)
Task Count: 16
Tasks Shown: One of 16
-> Task
@ -493,8 +479,7 @@ EXPLAIN (COSTS FALSE)
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
Aggregate
-> Custom Scan (CitusScan)
Executor: Task-Tracker
-> Custom Scan (Citus Task-Tracker)
Task Count: 4
Tasks Shown: None, not supported for re-partition queries
-> MapMergeJob
@ -524,9 +509,8 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
{
"Node Type": "Custom Scan",
"Parent Relationship": "Outer",
"Custom Plan Provider": "CitusScan",
"Custom Plan Provider": "Citus Task-Tracker",
"Distributed Query": {
"Executor": "Task-Tracker",
"Job": {
"Task Count": 4,
"Tasks Shown": "None, not supported for re-partition queries",
@ -579,9 +563,8 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Plan>
<Node-Type>Custom Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Custom-Plan-Provider>CitusScan</Custom-Plan-Provider>
<Custom-Plan-Provider>Citus Task-Tracker</Custom-Plan-Provider>
<Distributed-Query>
<Executor>Task-Tracker</Executor>
<Job>
<Task-Count>4</Task-Count>
<Tasks-Shown>None, not supported for re-partition queries</Tasks-Shown>
@ -631,9 +614,8 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
Plans:
- Node Type: "Custom Scan"
Parent Relationship: "Outer"
Custom Plan Provider: "CitusScan"
Custom Plan Provider: "Citus Task-Tracker"
Distributed Query:
Executor: "Task-Tracker"
Job:
Task Count: 4
Tasks Shown: "None, not supported for re-partition queries"

View File

@ -525,6 +525,12 @@ FETCH test_cursor; -- fetch one row after the last
---------+---------+---------+---------
(0 rows)
FETCH BACKWARD test_cursor;
value_1 | value_2 | value_3 | value_4
---------+---------+---------+--------------------------
2 | 2 | 2 | Fri Dec 02 00:00:00 2016
(1 row)
END;
-- table creation queries inside can be router plannable
CREATE TEMP TABLE temp_reference_test as

View File

@ -174,9 +174,8 @@ EXPLAIN SELECT * FROM repartition_udt JOIN repartition_udt_other
WHERE repartition_udt.pk > 1;
LOG: join order: [ "repartition_udt" ][ dual partition join "repartition_udt_other" ]
QUERY PLAN
-------------------------------------------------------------
Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
Executor: Task-Tracker
--------------------------------------------------------------------
Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0)
Task Count: 4
Tasks Shown: None, not supported for re-partition queries
-> MapMergeJob
@ -185,7 +184,7 @@ LOG: join order: [ "repartition_udt" ][ dual partition join "repartition_udt_ot
-> MapMergeJob
Map Task Count: 5
Merge Task Count: 4
(10 rows)
(9 rows)
SELECT * FROM repartition_udt JOIN repartition_udt_other
ON repartition_udt.udtcol = repartition_udt_other.udtcol

View File

@ -1312,6 +1312,12 @@ FETCH test_cursor;
11 | 1 | alamo | 1347
(1 row)
FETCH BACKWARD test_cursor;
id | author_id | title | word_count
----+-----------+----------+------------
1 | 1 | arsenous | 9572
(1 row)
END;
-- queries inside copy can be router plannable
COPY (
@ -1454,7 +1460,10 @@ CONTEXT: SQL statement "SELECT ah.id, ah.word_count
WHERE author_id = 1"
PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY
DEBUG: Plan is router executable
CONTEXT: PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY
CONTEXT: SQL statement "SELECT ah.id, ah.word_count
FROM articles_hash_mx ah
WHERE author_id = 1"
PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY
id | word_count
----+------------
1 | 9572

View File

@ -37,6 +37,17 @@ FETCH test_cursor;
1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon
(1 row)
FETCH test_cursor;
n_nationkey | n_name | n_regionkey | n_comment
-------------+--------+-------------+-----------
(0 rows)
FETCH BACKWARD test_cursor;
n_nationkey | n_name | n_regionkey | n_comment
-------------+---------------------------+-------------+------------------------------------------------------------------------------
1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon
(1 row)
END;
-- test with search_path is set
SET search_path TO citus_mx_test_schema;
@ -51,6 +62,17 @@ FETCH test_cursor;
1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon
(1 row)
FETCH test_cursor;
n_nationkey | n_name | n_regionkey | n_comment
-------------+--------+-------------+-----------
(0 rows)
FETCH BACKWARD test_cursor;
n_nationkey | n_name | n_regionkey | n_comment
-------------+---------------------------+-------------+------------------------------------------------------------------------------
1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon
(1 row)
END;
-- test inserting to table in different schema
SET search_path TO public;

View File

@ -171,9 +171,9 @@ INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename,
EXPLAIN SELECT count(*) FROM varchar_partitioned_table WHERE varchar_column = 'BA2';
DEBUG: predicate pruning for shardId 100
QUERY PLAN
--------------------------------------------------------------------
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
@ -181,9 +181,9 @@ EXPLAIN SELECT count(*) FROM array_partitioned_table
WHERE array_column > '{BA1000U2AMO4ZGX, BZZXSP27F21T6}';
DEBUG: predicate pruning for shardId 102
QUERY PLAN
--------------------------------------------------------------------
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
@ -191,9 +191,9 @@ EXPLAIN SELECT count(*) FROM composite_partitioned_table
WHERE composite_column < '(b,5,c)'::composite_type;
DEBUG: predicate pruning for shardId 105
QUERY PLAN
--------------------------------------------------------------------
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)

View File

@ -571,6 +571,12 @@ FETCH test_cursor; -- fetch one row after the last
---------+---------+---------+---------
(0 rows)
FETCH BACKWARD test_cursor;
value_1 | value_2 | value_3 | value_4
---------+---------+---------+--------------------------
2 | 2 | 2 | Fri Dec 02 00:00:00 2016
(1 row)
END;
-- table creation queries inside can be router plannable
CREATE TEMP TABLE temp_reference_test as

View File

@ -182,9 +182,8 @@ EXPLAIN SELECT * FROM repartition_udt JOIN repartition_udt_other
WHERE repartition_udt.pk > 1;
LOG: join order: [ "repartition_udt" ][ dual partition join "repartition_udt_other" ]
QUERY PLAN
-------------------------------------------------------------
Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
Executor: Task-Tracker
--------------------------------------------------------------------
Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0)
Task Count: 4
Tasks Shown: None, not supported for re-partition queries
-> MapMergeJob
@ -193,7 +192,7 @@ LOG: join order: [ "repartition_udt" ][ dual partition join "repartition_udt_ot
-> MapMergeJob
Map Task Count: 5
Merge Task Count: 4
(10 rows)
(9 rows)
SELECT * FROM repartition_udt JOIN repartition_udt_other
ON repartition_udt.udtcol = repartition_udt_other.udtcol

View File

@ -2057,6 +2057,12 @@ FETCH test_cursor; -- fetch one row after the last
----+-----------+-------+------------
(0 rows)
FETCH BACKWARD test_cursor;
id | author_id | title | word_count
----+-----------+----------+------------
41 | 1 | aznavour | 11814
(1 row)
END;
-- queries inside copy can be router plannable
COPY (
@ -2199,7 +2205,10 @@ CONTEXT: SQL statement "SELECT ah.id, ah.word_count
WHERE author_id = 1"
PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY
DEBUG: Plan is router executable
CONTEXT: PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY
CONTEXT: SQL statement "SELECT ah.id, ah.word_count
FROM articles_hash ah
WHERE author_id = 1"
PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY
id | word_count
----+------------
1 | 9572

View File

@ -155,6 +155,18 @@ FETCH test_cursor;
1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon
(1 row)
FETCH test_cursor;
n_nationkey | n_name | n_regionkey | n_comment
-------------+---------------------------+-------------+------------------------------------------------------------------------------
1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon
(1 row)
FETCH BACKWARD test_cursor;
n_nationkey | n_name | n_regionkey | n_comment
-------------+---------------------------+-------------+------------------------------------------------------------------------------
1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon
(1 row)
END;
-- test with search_path is set
SET search_path TO test_schema_support;
@ -169,6 +181,18 @@ FETCH test_cursor;
1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon
(1 row)
FETCH test_cursor;
n_nationkey | n_name | n_regionkey | n_comment
-------------+---------------------------+-------------+------------------------------------------------------------------------------
1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon
(1 row)
FETCH BACKWARD test_cursor;
n_nationkey | n_name | n_regionkey | n_comment
-------------+---------------------------+-------------+------------------------------------------------------------------------------
1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon
(1 row)
END;
-- test inserting to table in different schema
SET search_path TO public;

View File

@ -61,9 +61,9 @@ DEBUG: assigned task 2 to node localhost:57638
DEBUG: assigned task 4 to node localhost:57637
DEBUG: CommitTransactionCommand
QUERY PLAN
--------------------------------------------------------------------
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
@ -75,9 +75,9 @@ DEBUG: assigned task 2 to node localhost:57638
DEBUG: assigned task 4 to node localhost:57637
DEBUG: CommitTransactionCommand
QUERY PLAN
--------------------------------------------------------------------
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
@ -94,9 +94,9 @@ DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: CommitTransactionCommand
QUERY PLAN
--------------------------------------------------------------------
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
@ -108,9 +108,9 @@ DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: CommitTransactionCommand
QUERY PLAN
--------------------------------------------------------------------
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
@ -141,9 +141,9 @@ DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: CommitTransactionCommand
QUERY PLAN
--------------------------------------------------------------------
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
@ -155,9 +155,9 @@ DEBUG: assigned task 4 to node localhost:57638
DEBUG: assigned task 2 to node localhost:57637
DEBUG: CommitTransactionCommand
QUERY PLAN
--------------------------------------------------------------------
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)
@ -169,9 +169,9 @@ DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: CommitTransactionCommand
QUERY PLAN
--------------------------------------------------------------------
-----------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
explain statements for distributed queries are not enabled
(3 rows)

View File

@ -261,6 +261,12 @@ FETCH ABSOLUTE 5 FROM noHoldCursor;
1 | 5 | 24.00 | 0.10
(1 row)
FETCH BACKWARD noHoldCursor;
l_orderkey | l_linenumber | l_quantity | l_discount
------------+--------------+------------+------------
1 | 4 | 28.00 | 0.09
(1 row)
COMMIT;
FETCH ABSOLUTE 5 FROM noHoldCursor;
ERROR: cursor "noholdcursor" does not exist

View File

@ -33,11 +33,9 @@ test: multi_agg_distinct multi_limit_clause multi_limit_clause_approximate
test: multi_average_expression multi_working_columns
test: multi_array_agg
test: multi_agg_type_conversion multi_count_type_conversion
test: multi_partition_pruning
test: multi_join_pruning multi_hash_pruning
test: multi_hash_pruning
test: multi_null_minmax_value_pruning
test: multi_query_directory_cleanup
test: multi_task_assignment_policy
test: multi_utility_statements
test: multi_dropped_column_aliases
@ -52,7 +50,7 @@ test: multi_tpch_query7 multi_tpch_query7_nested
# Parallel tests to check our join order planning logic. Note that we load data
# below; and therefore these tests should come after the execution tests.
# ----------
test: multi_join_order_tpch_small multi_join_order_additional
test: multi_join_order_additional
test: multi_load_more_data
test: multi_join_order_tpch_large

View File

@ -767,8 +767,7 @@ FROM
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
Executor: Real-Time
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
Task Count: 2
Tasks Shown: One of 2
-> Task
@ -786,7 +785,7 @@ FROM
Sort Key: ((events.composite_id).tenant_id), ((events.composite_id).user_id)
-> Seq Scan on events_270009 events (cost=0.00..11.79 rows=3 width=556)
Filter: ((event_type)::text = ANY ('{click,submit,pay}'::text[]))
(20 rows)
(19 rows)
-- Union and left join subquery pushdown
EXPLAIN SELECT
@ -855,8 +854,7 @@ GROUP BY
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
HashAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: hasdone
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
Executor: Real-Time
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
Task Count: 2
Tasks Shown: One of 2
-> Task
@ -894,7 +892,7 @@ GROUP BY
Sort Key: ((events_2.composite_id).tenant_id), ((events_2.composite_id).user_id)
-> Seq Scan on events_270009 events_2 (cost=0.00..12.28 rows=1 width=80)
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type) AND ((event_type)::text = 'pay'::text))
(41 rows)
(40 rows)
-- Union, left join and having subquery pushdown
EXPLAIN SELECT
@ -1023,8 +1021,7 @@ LIMIT
Limit (cost=0.00..0.00 rows=0 width=0)
-> Sort (cost=0.00..0.00 rows=0 width=0)
Sort Key: user_lastseen DESC
-> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0)
Executor: Real-Time
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
Task Count: 2
Tasks Shown: One of 2
-> Task
@ -1047,6 +1044,6 @@ LIMIT
Sort Key: events.event_time DESC
-> Seq Scan on events_270009 events (cost=0.00..11.95 rows=1 width=524)
Filter: (((composite_id).tenant_id = ((users.composite_id).tenant_id)) AND ((composite_id).user_id = ((users.composite_id).user_id)))
(27 rows)
(26 rows)
SET citus.enable_router_execution TO 'true';

View File

@ -765,9 +765,9 @@ FROM
tenant_id,
user_id) AS subquery;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Distributed Query into pg_merge_job_270014
Executor: Real-Time
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
Task Count: 2
Tasks Shown: One of 2
-> Task
@ -785,10 +785,7 @@ FROM
Sort Key: ((events.composite_id).tenant_id), ((events.composite_id).user_id)
-> Seq Scan on events_270009 events (cost=0.00..11.79 rows=3 width=556)
Filter: ((event_type)::text = ANY ('{click,submit,pay}'::text[]))
Master Query
-> Aggregate (cost=0.00..0.00 rows=0 width=0)
-> Seq Scan on pg_merge_job_270014 (cost=0.00..0.00 rows=0 width=0)
(22 rows)
(19 rows)
-- Union and left join subquery pushdown
EXPLAIN SELECT
@ -854,9 +851,10 @@ FROM
GROUP BY
hasdone;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Distributed Query into pg_merge_job_270015
Executor: Real-Time
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
HashAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: hasdone
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
Task Count: 2
Tasks Shown: One of 2
-> Task
@ -891,11 +889,7 @@ GROUP BY
Sort Key: ((events_2.composite_id).tenant_id), ((events_2.composite_id).user_id)
-> Seq Scan on events_270009 events_2 (cost=0.00..12.28 rows=1 width=32)
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type) AND ((event_type)::text = 'pay'::text))
Master Query
-> HashAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: intermediate_column_270015_2
-> Seq Scan on pg_merge_job_270015 (cost=0.00..0.00 rows=0 width=0)
(40 rows)
(37 rows)
-- Union, left join and having subquery pushdown
EXPLAIN SELECT
@ -1020,9 +1014,11 @@ ORDER BY
LIMIT
10;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Distributed Query into pg_merge_job_270017
Executor: Real-Time
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit (cost=0.00..0.00 rows=0 width=0)
-> Sort (cost=0.00..0.00 rows=0 width=0)
Sort Key: user_lastseen DESC
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
Task Count: 2
Tasks Shown: One of 2
-> Task
@ -1045,11 +1041,6 @@ LIMIT
Sort Key: events.event_time DESC
-> Seq Scan on events_270009 events (cost=0.00..11.95 rows=1 width=524)
Filter: (((composite_id).tenant_id = ((users.composite_id).tenant_id)) AND ((composite_id).user_id = ((users.composite_id).user_id)))
Master Query
-> Limit (cost=0.00..0.00 rows=0 width=0)
-> Sort (cost=0.00..0.00 rows=0 width=0)
Sort Key: intermediate_column_270017_2 DESC
-> Seq Scan on pg_merge_job_270017 (cost=0.00..0.00 rows=0 width=0)
(29 rows)
(26 rows)
SET citus.enable_router_execution TO 'true';

View File

@ -11,7 +11,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 650000;
SET citus.explain_distributed_queries TO off;
SET citus.log_multi_join_order TO TRUE;
SET citus.task_executor_type = 'task-tracker'; -- can't explain all queries otherwhise
SET citus.task_executor_type = 'task-tracker'; -- can't explain all queries otherwise
SET client_min_messages TO DEBUG2;
-- Create new table definitions for use in testing in distributed planning and

View File

@ -11,7 +11,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 660000;
SET citus.explain_distributed_queries TO off;
SET citus.log_multi_join_order TO TRUE;
SET citus.task_executor_type = 'task-tracker'; -- can't explain all queries otherwhise
SET citus.task_executor_type = 'task-tracker'; -- can't explain all queries otherwise
SET client_min_messages TO LOG;
-- Change configuration to treat lineitem, orders, customer, and part tables as

View File

@ -304,6 +304,7 @@ DECLARE test_cursor CURSOR FOR
FETCH test_cursor;
FETCH ALL test_cursor;
FETCH test_cursor; -- fetch one row after the last
FETCH BACKWARD test_cursor;
END;
-- table creation queries inside can be router plannable

View File

@ -557,6 +557,7 @@ DECLARE test_cursor CURSOR FOR
ORDER BY id;
FETCH test_cursor;
FETCH test_cursor;
FETCH BACKWARD test_cursor;
END;
-- queries inside copy can be router plannable

View File

@ -21,6 +21,8 @@ DECLARE test_cursor CURSOR FOR
FROM nation_hash
WHERE n_nationkey = 1;
FETCH test_cursor;
FETCH test_cursor;
FETCH BACKWARD test_cursor;
END;
-- test with search_path is set
@ -31,6 +33,8 @@ DECLARE test_cursor CURSOR FOR
FROM nation_hash
WHERE n_nationkey = 1;
FETCH test_cursor;
FETCH test_cursor;
FETCH BACKWARD test_cursor;
END;

View File

@ -335,6 +335,7 @@ DECLARE test_cursor CURSOR FOR
FETCH test_cursor;
FETCH ALL test_cursor;
FETCH test_cursor; -- fetch one row after the last
FETCH BACKWARD test_cursor;
END;
-- table creation queries inside can be router plannable

View File

@ -917,6 +917,7 @@ DECLARE test_cursor CURSOR FOR
FETCH test_cursor;
FETCH ALL test_cursor;
FETCH test_cursor; -- fetch one row after the last
FETCH BACKWARD test_cursor;
END;
-- queries inside copy can be router plannable

View File

@ -113,6 +113,8 @@ DECLARE test_cursor CURSOR FOR
FROM test_schema_support.nation_append
WHERE n_nationkey = 1;
FETCH test_cursor;
FETCH test_cursor;
FETCH BACKWARD test_cursor;
END;
-- test with search_path is set
@ -123,6 +125,8 @@ DECLARE test_cursor CURSOR FOR
FROM nation_append
WHERE n_nationkey = 1;
FETCH test_cursor;
FETCH test_cursor;
FETCH BACKWARD test_cursor;
END;

View File

@ -146,5 +146,6 @@ DECLARE noHoldCursor SCROLL CURSOR FOR
ORDER BY l_orderkey, l_linenumber;
FETCH ABSOLUTE 5 FROM noHoldCursor;
FETCH BACKWARD noHoldCursor;
COMMIT;
FETCH ABSOLUTE 5 FROM noHoldCursor;