mirror of https://github.com/citusdata/citus.git
Add some more comments.
parent
2b2e065acc
commit
de0082d7fc
|
|
@ -33,6 +33,16 @@
|
||||||
#include "utils/memutils.h"
|
#include "utils/memutils.h"
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FIXME: It'd probably be better to have different set of methods for:
|
||||||
|
* - router readonly queries
|
||||||
|
* - router modify
|
||||||
|
* - router insert ... select
|
||||||
|
* - real-time/task-tracker (no point in seperating those)
|
||||||
|
*
|
||||||
|
* I think it's better however to only have one type of CitusScanState, to
|
||||||
|
* allow to easily share code between routines.
|
||||||
|
*/
|
||||||
static CustomExecMethods CitusCustomExecMethods = {
|
static CustomExecMethods CitusCustomExecMethods = {
|
||||||
"CitusScan",
|
"CitusScan",
|
||||||
CitusBeginScan,
|
CitusBeginScan,
|
||||||
|
|
@ -141,8 +151,27 @@ CitusExecScan(CustomScanState *node)
|
||||||
}
|
}
|
||||||
|
|
||||||
tupleDescriptor = node->ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor;
|
tupleDescriptor = node->ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor;
|
||||||
fakeRel = palloc0(sizeof(RelationData));
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Load data, collected by Multi*Execute() above, into a
|
||||||
|
* tuplestore. For that first create a tuplestore, and then copy
|
||||||
|
* the files one-by-one.
|
||||||
|
*
|
||||||
|
* FIXME: Should probably be in a separate routine.
|
||||||
|
*
|
||||||
|
* Long term it'd be a lot better if Multi*Execute() directly
|
||||||
|
* filled the tuplestores, but that's a fair bit of work.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
* To be able to use copy.c, we need a Relation descriptor. As
|
||||||
|
* there's no relation corresponding to the data loaded from
|
||||||
|
* workers, fake one. We just need the bare minimal set of fields
|
||||||
|
* accessed by BeginCopyFrom().
|
||||||
|
*
|
||||||
|
* FIXME: should be abstracted into a separate function.
|
||||||
|
*/
|
||||||
|
fakeRel = palloc0(sizeof(RelationData));
|
||||||
fakeRel->rd_att = tupleDescriptor;
|
fakeRel->rd_att = tupleDescriptor;
|
||||||
fakeRel->rd_rel = palloc0(sizeof(FormData_pg_class));
|
fakeRel->rd_rel = palloc0(sizeof(FormData_pg_class));
|
||||||
fakeRel->rd_rel->relkind = RELKIND_RELATION;
|
fakeRel->rd_rel->relkind = RELKIND_RELATION;
|
||||||
|
|
@ -231,5 +260,10 @@ CitusReScan(CustomScanState *node)
|
||||||
|
|
||||||
scanState->tuplestorestate = NULL;
|
scanState->tuplestorestate = NULL;
|
||||||
scanState->finishedUnderlyingScan = true;
|
scanState->finishedUnderlyingScan = true;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* XXX: this probably already works, but if not should be easily
|
||||||
|
* supportable - probably hard to exercise right now though.
|
||||||
|
*/
|
||||||
elog(WARNING, "unsupported at this point");
|
elog(WARNING, "unsupported at this point");
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -442,6 +442,7 @@ RouterExecScan(CitusScanState *scanState)
|
||||||
bool isModificationQuery = false;
|
bool isModificationQuery = false;
|
||||||
CmdType operation = multiPlan->operation;
|
CmdType operation = multiPlan->operation;
|
||||||
|
|
||||||
|
/* should use IsModificationStmt or such */
|
||||||
if (operation == CMD_INSERT || operation == CMD_UPDATE ||
|
if (operation == CMD_INSERT || operation == CMD_UPDATE ||
|
||||||
operation == CMD_DELETE)
|
operation == CMD_DELETE)
|
||||||
{
|
{
|
||||||
|
|
@ -482,9 +483,16 @@ RouterExecScan(CitusScanState *scanState)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* if the underlying query produced output, return it */
|
/* if the underlying query produced output, return it */
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FIXME: centralize this into function to be shared between router and
|
||||||
|
* other executors?
|
||||||
|
*/
|
||||||
if (scanState->tuplestorestate != NULL)
|
if (scanState->tuplestorestate != NULL)
|
||||||
{
|
{
|
||||||
Tuplestorestate *tupleStore = scanState->tuplestorestate;
|
Tuplestorestate *tupleStore = scanState->tuplestorestate;
|
||||||
|
|
||||||
|
/* XXX: could trivially support backward scans here */
|
||||||
tuplestore_gettupleslot(tupleStore, true, false, resultSlot);
|
tuplestore_gettupleslot(tupleStore, true, false, resultSlot);
|
||||||
|
|
||||||
return resultSlot;
|
return resultSlot;
|
||||||
|
|
@ -805,6 +813,7 @@ ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
|
||||||
/* can only support modifications right now */
|
/* can only support modifications right now */
|
||||||
Assert(isModificationQuery);
|
Assert(isModificationQuery);
|
||||||
|
|
||||||
|
/* XXX: Seems very redundant to pass both scanState and tupleDescriptor */
|
||||||
affectedTupleCount = ExecuteModifyTasks(taskList, expectResults, paramListInfo,
|
affectedTupleCount = ExecuteModifyTasks(taskList, expectResults, paramListInfo,
|
||||||
scanState, tupleDescriptor);
|
scanState, tupleDescriptor);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -99,8 +99,16 @@ CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* XXX: can we get by without the open/close group somehow - then we'd not
|
||||||
|
* copy any code from explain.c? Seems unlikely.
|
||||||
|
*/
|
||||||
ExplainOpenGroup("Distributed Query", "Distributed Query", true, es);
|
ExplainOpenGroup("Distributed Query", "Distributed Query", true, es);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* XXX: might be worthwhile to put this somewhere central, e.g. for
|
||||||
|
* debugging output.
|
||||||
|
*/
|
||||||
switch (scanState->executorType)
|
switch (scanState->executorType)
|
||||||
{
|
{
|
||||||
case MULTI_EXECUTOR_ROUTER:
|
case MULTI_EXECUTOR_ROUTER:
|
||||||
|
|
|
||||||
|
|
@ -400,6 +400,10 @@ MultiQueryContainerNode(PlannedStmt *originalPlan, MultiPlan *multiPlan)
|
||||||
customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN;
|
customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FIXME: these two branches/pieces of code should probably be moved into
|
||||||
|
* router / logical planner code respectively.
|
||||||
|
*/
|
||||||
if (multiPlan->masterQuery)
|
if (multiPlan->masterQuery)
|
||||||
{
|
{
|
||||||
resultPlan = MasterNodeSelectPlan(multiPlan, customScan);
|
resultPlan = MasterNodeSelectPlan(multiPlan, customScan);
|
||||||
|
|
@ -415,6 +419,10 @@ MultiQueryContainerNode(PlannedStmt *originalPlan, MultiPlan *multiPlan)
|
||||||
List *columnNames = NIL;
|
List *columnNames = NIL;
|
||||||
int newRTI = list_length(originalPlan->rtable) + 1;
|
int newRTI = list_length(originalPlan->rtable) + 1;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* XXX: This basically just builds a targetlist to "read" from the
|
||||||
|
* custom scan output.
|
||||||
|
*/
|
||||||
foreach(lc, originalPlan->planTree->targetlist)
|
foreach(lc, originalPlan->planTree->targetlist)
|
||||||
{
|
{
|
||||||
TargetEntry *te = lfirst(lc);
|
TargetEntry *te = lfirst(lc);
|
||||||
|
|
@ -422,6 +430,12 @@ MultiQueryContainerNode(PlannedStmt *originalPlan, MultiPlan *multiPlan)
|
||||||
TargetEntry *newTargetEntry = NULL;
|
TargetEntry *newTargetEntry = NULL;
|
||||||
|
|
||||||
Assert(IsA(te, TargetEntry));
|
Assert(IsA(te, TargetEntry));
|
||||||
|
|
||||||
|
/*
|
||||||
|
* XXX: I can't think of a case where we'd need resjunk stuff at
|
||||||
|
* the toplevel of a router query - all things needing it have
|
||||||
|
* been pushed down.
|
||||||
|
*/
|
||||||
if (te->resjunk)
|
if (te->resjunk)
|
||||||
{
|
{
|
||||||
foundJunk = true;
|
foundJunk = true;
|
||||||
|
|
@ -433,6 +447,7 @@ MultiQueryContainerNode(PlannedStmt *originalPlan, MultiPlan *multiPlan)
|
||||||
ereport(ERROR, (errmsg("unexpected !junk entry after resjunk entry")));
|
ereport(ERROR, (errmsg("unexpected !junk entry after resjunk entry")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* build TE pointing to custom scan */
|
||||||
newVar = makeVarFromTargetEntry(newRTI, te);
|
newVar = makeVarFromTargetEntry(newRTI, te);
|
||||||
newTargetEntry = flatCopyTargetEntry(te);
|
newTargetEntry = flatCopyTargetEntry(te);
|
||||||
newTargetEntry->expr = (Expr *) newVar;
|
newTargetEntry->expr = (Expr *) newVar;
|
||||||
|
|
@ -441,6 +456,7 @@ MultiQueryContainerNode(PlannedStmt *originalPlan, MultiPlan *multiPlan)
|
||||||
columnNames = lappend(columnNames, makeString(te->resname));
|
columnNames = lappend(columnNames, makeString(te->resname));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* XXX: can't think of a better RTE type than VALUES */
|
||||||
rangeTableEntry = makeNode(RangeTblEntry);
|
rangeTableEntry = makeNode(RangeTblEntry);
|
||||||
rangeTableEntry->rtekind = RTE_VALUES; /* can't look up relation */
|
rangeTableEntry->rtekind = RTE_VALUES; /* can't look up relation */
|
||||||
rangeTableEntry->eref = makeAlias("remote_scan", columnNames);
|
rangeTableEntry->eref = makeAlias("remote_scan", columnNames);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue