Refactor query execution functions

pull/1829/head
Marco Slot 2017-11-24 13:45:47 +01:00
parent 2d66bf5f16
commit 73989b07eb
3 changed files with 160 additions and 94 deletions

View File

@ -36,8 +36,6 @@
static void ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList, static void ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList,
Query *selectQuery, EState *executorState); Query *selectQuery, EState *executorState);
static void ExecuteIntoDestReceiver(Query *query, ParamListInfo params,
DestReceiver *dest);
/* /*
@ -137,50 +135,9 @@ ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList,
partitionColumnIndex, executorState, partitionColumnIndex, executorState,
stopOnFailure); stopOnFailure);
ExecuteIntoDestReceiver(selectQuery, paramListInfo, (DestReceiver *) copyDest); ExecuteQueryIntoDestReceiver(selectQuery, paramListInfo, (DestReceiver *) copyDest);
executorState->es_processed = copyDest->tuplesSent; executorState->es_processed = copyDest->tuplesSent;
XactModificationLevel = XACT_MODIFICATION_DATA; XactModificationLevel = XACT_MODIFICATION_DATA;
} }
/*
* ExecuteIntoDestReceiver plans and executes a query and sends results to the given
* DestReceiver.
*/
static void
ExecuteIntoDestReceiver(Query *query, ParamListInfo params, DestReceiver *dest)
{
PlannedStmt *queryPlan = NULL;
Portal portal = NULL;
int eflags = 0;
int cursorOptions = 0;
long count = FETCH_ALL;
/* create a new portal for executing the query */
portal = CreateNewPortal();
/* don't display the portal in pg_cursors, it is for internal use only */
portal->visible = false;
cursorOptions = CURSOR_OPT_PARALLEL_OK;
/* plan the subquery, this may be another distributed query */
queryPlan = pg_plan_query(query, cursorOptions, params);
PortalDefineQuery(portal,
NULL,
"",
"SELECT",
list_make1(queryPlan),
NULL);
PortalStart(portal, params, eflags, GetActiveSnapshot());
#if (PG_VERSION_NUM >= 100000)
PortalRun(portal, count, false, true, dest, dest, NULL);
#else
PortalRun(portal, count, false, dest, dest, NULL);
#endif
PortalDrop(portal, false);
}

View File

@ -34,6 +34,7 @@
#include "nodes/makefuncs.h" #include "nodes/makefuncs.h"
#include "parser/parsetree.h" #include "parser/parsetree.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
#include "tcop/pquery.h"
#include "tcop/utility.h" #include "tcop/utility.h"
#include "utils/snapmgr.h" #include "utils/snapmgr.h"
#include "utils/memutils.h" #include "utils/memutils.h"
@ -93,29 +94,13 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob)
{ {
CustomScanState customScanState = citusScanState->customScanState; CustomScanState customScanState = citusScanState->customScanState;
List *workerTaskList = workerJob->taskList; List *workerTaskList = workerJob->taskList;
List *copyOptions = NIL;
EState *executorState = NULL;
MemoryContext executorTupleContext = NULL;
ExprContext *executorExpressionContext = NULL;
TupleDesc tupleDescriptor = NULL; TupleDesc tupleDescriptor = NULL;
Relation stubRelation = NULL;
ListCell *workerTaskCell = NULL; ListCell *workerTaskCell = NULL;
uint32 columnCount = 0;
Datum *columnValues = NULL;
bool *columnNulls = NULL;
bool randomAccess = true; bool randomAccess = true;
bool interTransactions = false; bool interTransactions = false;
char *copyFormat = "text";
executorState = citusScanState->customScanState.ss.ps.state;
executorTupleContext = GetPerTupleMemoryContext(executorState);
executorExpressionContext = GetPerTupleExprContext(executorState);
tupleDescriptor = customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; tupleDescriptor = customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor;
stubRelation = StubRelation(tupleDescriptor);
columnCount = tupleDescriptor->natts;
columnValues = palloc0(columnCount * sizeof(Datum));
columnNulls = palloc0(columnCount * sizeof(bool));
Assert(citusScanState->tuplestorestate == NULL); Assert(citusScanState->tuplestorestate == NULL);
citusScanState->tuplestorestate = citusScanState->tuplestorestate =
@ -123,16 +108,7 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob)
if (BinaryMasterCopyFormat) if (BinaryMasterCopyFormat)
{ {
DefElem *copyOption = NULL; copyFormat = "binary";
#if (PG_VERSION_NUM >= 100000)
int location = -1; /* "unknown" token location */
copyOption = makeDefElem("format", (Node *) makeString("binary"), location);
#else
copyOption = makeDefElem("format", (Node *) makeString("binary"));
#endif
copyOptions = lappend(copyOptions, copyOption);
} }
foreach(workerTaskCell, workerTaskList) foreach(workerTaskCell, workerTaskList)
@ -140,42 +116,83 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob)
Task *workerTask = (Task *) lfirst(workerTaskCell); Task *workerTask = (Task *) lfirst(workerTaskCell);
StringInfo jobDirectoryName = NULL; StringInfo jobDirectoryName = NULL;
StringInfo taskFilename = NULL; StringInfo taskFilename = NULL;
CopyState copyState = NULL;
jobDirectoryName = MasterJobDirectoryName(workerTask->jobId); jobDirectoryName = MasterJobDirectoryName(workerTask->jobId);
taskFilename = TaskFilename(jobDirectoryName, workerTask->taskId); taskFilename = TaskFilename(jobDirectoryName, workerTask->taskId);
ReadFileIntoTupleStore(taskFilename->data, copyFormat, tupleDescriptor,
citusScanState->tuplestorestate);
}
}
/*
* ReadFileIntoTupleStore parses the records in a COPY-formatted file according
* according to the given tuple descriptor and stores the records in a tuple
* store.
*/
void
ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc tupleDescriptor,
Tuplestorestate *tupstore)
{
CopyState copyState = NULL;
/*
* Trick BeginCopyFrom into using our tuple descriptor by pretending it belongs
* to a relation.
*/
Relation stubRelation = StubRelation(tupleDescriptor);
EState *executorState = CreateExecutorState();
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
ExprContext *executorExpressionContext = GetPerTupleExprContext(executorState);
int columnCount = tupleDescriptor->natts;
Datum *columnValues = palloc0(columnCount * sizeof(Datum));
bool *columnNulls = palloc0(columnCount * sizeof(bool));
DefElem *copyOption = NULL;
List *copyOptions = NIL;
#if (PG_VERSION_NUM >= 100000) #if (PG_VERSION_NUM >= 100000)
copyState = BeginCopyFrom(NULL, stubRelation, taskFilename->data, false, NULL, int location = -1; /* "unknown" token location */
NULL, copyOptions); copyOption = makeDefElem("format", (Node *) makeString(copyFormat), location);
#else #else
copyState = BeginCopyFrom(stubRelation, taskFilename->data, false, NULL, copyOption = makeDefElem("format", (Node *) makeString(copyFormat));
copyOptions); #endif
copyOptions = lappend(copyOptions, copyOption);
#if (PG_VERSION_NUM >= 100000)
copyState = BeginCopyFrom(NULL, stubRelation, fileName, false, NULL,
NULL, copyOptions);
#else
copyState = BeginCopyFrom(stubRelation, fileName, false, NULL,
copyOptions);
#endif #endif
while (true) while (true)
{
MemoryContext oldContext = NULL;
bool nextRowFound = false;
ResetPerTupleExprContext(executorState);
oldContext = MemoryContextSwitchTo(executorTupleContext);
nextRowFound = NextCopyFrom(copyState, executorExpressionContext,
columnValues, columnNulls, NULL);
if (!nextRowFound)
{ {
MemoryContext oldContext = NULL;
bool nextRowFound = false;
ResetPerTupleExprContext(executorState);
oldContext = MemoryContextSwitchTo(executorTupleContext);
nextRowFound = NextCopyFrom(copyState, executorExpressionContext,
columnValues, columnNulls, NULL);
if (!nextRowFound)
{
MemoryContextSwitchTo(oldContext);
break;
}
tuplestore_putvalues(citusScanState->tuplestorestate, tupleDescriptor,
columnValues, columnNulls);
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
break;
} }
EndCopyFrom(copyState); tuplestore_putvalues(tupstore, tupleDescriptor, columnValues, columnNulls);
MemoryContextSwitchTo(oldContext);
} }
EndCopyFrom(copyState);
pfree(columnValues);
pfree(columnNulls);
} }
@ -195,3 +212,86 @@ StubRelation(TupleDesc tupleDescriptor)
return stubRelation; return stubRelation;
} }
/*
* ExecuteQueryStringIntoDestReceiver plans and executes a query and sends results
* to the given DestReceiver.
*/
void
ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params,
DestReceiver *dest)
{
Query *query = NULL;
#if (PG_VERSION_NUM >= 100000)
RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString);
List *queryTreeList = pg_analyze_and_rewrite(rawStmt, queryString, NULL, 0, NULL);
#else
Node *queryTreeNode = ParseTreeNode(queryString);
List *queryTreeList = pg_analyze_and_rewrite(queryTreeNode, queryString, NULL, 0);
#endif
if (list_length(queryTreeList) != 1)
{
ereport(ERROR, (errmsg("can only execute a single query")));
}
query = (Query *) linitial(queryTreeList);
ExecuteQueryIntoDestReceiver(query, params, dest);
}
/*
* ExecuteQueryIntoDestReceiver plans and executes a query and sends results to the given
* DestReceiver.
*/
void
ExecuteQueryIntoDestReceiver(Query *query, ParamListInfo params, DestReceiver *dest)
{
PlannedStmt *queryPlan = NULL;
int cursorOptions = 0;
cursorOptions = CURSOR_OPT_PARALLEL_OK;
/* plan the subquery, this may be another distributed query */
queryPlan = pg_plan_query(query, cursorOptions, params);
ExecutePlanIntoDestReceiver(queryPlan, params, dest);
}
/*
* ExecuteIntoDestReceiver plans and executes a query and sends results to the given
* DestReceiver.
*/
void
ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params,
DestReceiver *dest)
{
Portal portal = NULL;
int eflags = 0;
long count = FETCH_ALL;
/* create a new portal for executing the query */
portal = CreateNewPortal();
/* don't display the portal in pg_cursors, it is for internal use only */
portal->visible = false;
PortalDefineQuery(portal,
NULL,
"",
"SELECT",
list_make1(queryPlan),
NULL);
PortalStart(portal, params, eflags, GetActiveSnapshot());
#if (PG_VERSION_NUM >= 100000)
PortalRun(portal, count, false, true, dest, dest, NULL);
#else
PortalRun(portal, count, false, dest, dest, NULL);
#endif
PortalDrop(portal, false);
}

View File

@ -30,6 +30,15 @@ extern int MultiShardConnectionType;
extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState); extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState);
extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob); extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob);
extern void ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc
tupleDescriptor, Tuplestorestate *tupstore);
extern void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo
params,
DestReceiver *dest);
extern void ExecuteQueryIntoDestReceiver(Query *query, ParamListInfo params,
DestReceiver *dest);
extern void ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params,
DestReceiver *dest);
#endif /* MULTI_EXECUTOR_H */ #endif /* MULTI_EXECUTOR_H */