From 73989b07ebc21f9a96186168a2aa2192180bbdfc Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Fri, 24 Nov 2017 13:45:47 +0100 Subject: [PATCH] Refactor query execution functions --- .../executor/insert_select_executor.c | 45 +--- .../distributed/executor/multi_executor.c | 200 +++++++++++++----- src/include/distributed/multi_executor.h | 9 + 3 files changed, 160 insertions(+), 94 deletions(-) diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index b71082747..9e55680e4 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -36,8 +36,6 @@ static void ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList, Query *selectQuery, EState *executorState); -static void ExecuteIntoDestReceiver(Query *query, ParamListInfo params, - DestReceiver *dest); /* @@ -137,50 +135,9 @@ ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList, partitionColumnIndex, executorState, stopOnFailure); - ExecuteIntoDestReceiver(selectQuery, paramListInfo, (DestReceiver *) copyDest); + ExecuteQueryIntoDestReceiver(selectQuery, paramListInfo, (DestReceiver *) copyDest); executorState->es_processed = copyDest->tuplesSent; XactModificationLevel = XACT_MODIFICATION_DATA; } - - -/* - * ExecuteIntoDestReceiver plans and executes a query and sends results to the given - * DestReceiver. - */ -static void -ExecuteIntoDestReceiver(Query *query, ParamListInfo params, DestReceiver *dest) -{ - PlannedStmt *queryPlan = NULL; - Portal portal = NULL; - int eflags = 0; - int cursorOptions = 0; - long count = FETCH_ALL; - - /* create a new portal for executing the query */ - portal = CreateNewPortal(); - - /* don't display the portal in pg_cursors, it is for internal use only */ - portal->visible = false; - - cursorOptions = CURSOR_OPT_PARALLEL_OK; - - /* plan the subquery, this may be another distributed query */ - queryPlan = pg_plan_query(query, cursorOptions, params); - - PortalDefineQuery(portal, - NULL, - "", - "SELECT", - list_make1(queryPlan), - NULL); - - PortalStart(portal, params, eflags, GetActiveSnapshot()); -#if (PG_VERSION_NUM >= 100000) - PortalRun(portal, count, false, true, dest, dest, NULL); -#else - PortalRun(portal, count, false, dest, dest, NULL); -#endif - PortalDrop(portal, false); -} diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 7216bc5f5..2a0835d1f 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -34,6 +34,7 @@ #include "nodes/makefuncs.h" #include "parser/parsetree.h" #include "storage/lmgr.h" +#include "tcop/pquery.h" #include "tcop/utility.h" #include "utils/snapmgr.h" #include "utils/memutils.h" @@ -93,29 +94,13 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob) { CustomScanState customScanState = citusScanState->customScanState; List *workerTaskList = workerJob->taskList; - List *copyOptions = NIL; - EState *executorState = NULL; - MemoryContext executorTupleContext = NULL; - ExprContext *executorExpressionContext = NULL; TupleDesc tupleDescriptor = NULL; - Relation stubRelation = NULL; ListCell *workerTaskCell = NULL; - uint32 columnCount = 0; - Datum *columnValues = NULL; - bool *columnNulls = NULL; bool randomAccess = true; bool interTransactions = false; - - executorState = citusScanState->customScanState.ss.ps.state; - executorTupleContext = GetPerTupleMemoryContext(executorState); - executorExpressionContext = GetPerTupleExprContext(executorState); + char *copyFormat = "text"; tupleDescriptor = customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; - stubRelation = StubRelation(tupleDescriptor); - - columnCount = tupleDescriptor->natts; - columnValues = palloc0(columnCount * sizeof(Datum)); - columnNulls = palloc0(columnCount * sizeof(bool)); Assert(citusScanState->tuplestorestate == NULL); citusScanState->tuplestorestate = @@ -123,16 +108,7 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob) if (BinaryMasterCopyFormat) { - DefElem *copyOption = NULL; - -#if (PG_VERSION_NUM >= 100000) - int location = -1; /* "unknown" token location */ - copyOption = makeDefElem("format", (Node *) makeString("binary"), location); -#else - copyOption = makeDefElem("format", (Node *) makeString("binary")); -#endif - - copyOptions = lappend(copyOptions, copyOption); + copyFormat = "binary"; } foreach(workerTaskCell, workerTaskList) @@ -140,42 +116,83 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob) Task *workerTask = (Task *) lfirst(workerTaskCell); StringInfo jobDirectoryName = NULL; StringInfo taskFilename = NULL; - CopyState copyState = NULL; jobDirectoryName = MasterJobDirectoryName(workerTask->jobId); taskFilename = TaskFilename(jobDirectoryName, workerTask->taskId); + ReadFileIntoTupleStore(taskFilename->data, copyFormat, tupleDescriptor, + citusScanState->tuplestorestate); + } +} + + +/* + * ReadFileIntoTupleStore parses the records in a COPY-formatted file according + * according to the given tuple descriptor and stores the records in a tuple + * store. + */ +void +ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc tupleDescriptor, + Tuplestorestate *tupstore) +{ + CopyState copyState = NULL; + + /* + * Trick BeginCopyFrom into using our tuple descriptor by pretending it belongs + * to a relation. + */ + Relation stubRelation = StubRelation(tupleDescriptor); + + EState *executorState = CreateExecutorState(); + MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); + ExprContext *executorExpressionContext = GetPerTupleExprContext(executorState); + + int columnCount = tupleDescriptor->natts; + Datum *columnValues = palloc0(columnCount * sizeof(Datum)); + bool *columnNulls = palloc0(columnCount * sizeof(bool)); + + DefElem *copyOption = NULL; + List *copyOptions = NIL; + #if (PG_VERSION_NUM >= 100000) - copyState = BeginCopyFrom(NULL, stubRelation, taskFilename->data, false, NULL, - NULL, copyOptions); + int location = -1; /* "unknown" token location */ + copyOption = makeDefElem("format", (Node *) makeString(copyFormat), location); #else - copyState = BeginCopyFrom(stubRelation, taskFilename->data, false, NULL, - copyOptions); + copyOption = makeDefElem("format", (Node *) makeString(copyFormat)); +#endif + copyOptions = lappend(copyOptions, copyOption); + +#if (PG_VERSION_NUM >= 100000) + copyState = BeginCopyFrom(NULL, stubRelation, fileName, false, NULL, + NULL, copyOptions); +#else + copyState = BeginCopyFrom(stubRelation, fileName, false, NULL, + copyOptions); #endif - while (true) + while (true) + { + MemoryContext oldContext = NULL; + bool nextRowFound = false; + + ResetPerTupleExprContext(executorState); + oldContext = MemoryContextSwitchTo(executorTupleContext); + + nextRowFound = NextCopyFrom(copyState, executorExpressionContext, + columnValues, columnNulls, NULL); + if (!nextRowFound) { - MemoryContext oldContext = NULL; - bool nextRowFound = false; - - ResetPerTupleExprContext(executorState); - oldContext = MemoryContextSwitchTo(executorTupleContext); - - nextRowFound = NextCopyFrom(copyState, executorExpressionContext, - columnValues, columnNulls, NULL); - if (!nextRowFound) - { - MemoryContextSwitchTo(oldContext); - break; - } - - tuplestore_putvalues(citusScanState->tuplestorestate, tupleDescriptor, - columnValues, columnNulls); MemoryContextSwitchTo(oldContext); + break; } - EndCopyFrom(copyState); + tuplestore_putvalues(tupstore, tupleDescriptor, columnValues, columnNulls); + MemoryContextSwitchTo(oldContext); } + + EndCopyFrom(copyState); + pfree(columnValues); + pfree(columnNulls); } @@ -195,3 +212,86 @@ StubRelation(TupleDesc tupleDescriptor) return stubRelation; } + + +/* + * ExecuteQueryStringIntoDestReceiver plans and executes a query and sends results + * to the given DestReceiver. + */ +void +ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params, + DestReceiver *dest) +{ + Query *query = NULL; + +#if (PG_VERSION_NUM >= 100000) + RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString); + List *queryTreeList = pg_analyze_and_rewrite(rawStmt, queryString, NULL, 0, NULL); +#else + Node *queryTreeNode = ParseTreeNode(queryString); + List *queryTreeList = pg_analyze_and_rewrite(queryTreeNode, queryString, NULL, 0); +#endif + + if (list_length(queryTreeList) != 1) + { + ereport(ERROR, (errmsg("can only execute a single query"))); + } + + query = (Query *) linitial(queryTreeList); + + ExecuteQueryIntoDestReceiver(query, params, dest); +} + + +/* + * ExecuteQueryIntoDestReceiver plans and executes a query and sends results to the given + * DestReceiver. + */ +void +ExecuteQueryIntoDestReceiver(Query *query, ParamListInfo params, DestReceiver *dest) +{ + PlannedStmt *queryPlan = NULL; + int cursorOptions = 0; + + cursorOptions = CURSOR_OPT_PARALLEL_OK; + + /* plan the subquery, this may be another distributed query */ + queryPlan = pg_plan_query(query, cursorOptions, params); + + ExecutePlanIntoDestReceiver(queryPlan, params, dest); +} + + +/* + * ExecuteIntoDestReceiver plans and executes a query and sends results to the given + * DestReceiver. + */ +void +ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params, + DestReceiver *dest) +{ + Portal portal = NULL; + int eflags = 0; + long count = FETCH_ALL; + + /* create a new portal for executing the query */ + portal = CreateNewPortal(); + + /* don't display the portal in pg_cursors, it is for internal use only */ + portal->visible = false; + + PortalDefineQuery(portal, + NULL, + "", + "SELECT", + list_make1(queryPlan), + NULL); + + PortalStart(portal, params, eflags, GetActiveSnapshot()); +#if (PG_VERSION_NUM >= 100000) + PortalRun(portal, count, false, true, dest, dest, NULL); +#else + PortalRun(portal, count, false, dest, dest, NULL); +#endif + PortalDrop(portal, false); +} diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index c2fbff914..58aab1395 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -30,6 +30,15 @@ extern int MultiShardConnectionType; extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState); extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob); +extern void ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc + tupleDescriptor, Tuplestorestate *tupstore); +extern void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo + params, + DestReceiver *dest); +extern void ExecuteQueryIntoDestReceiver(Query *query, ParamListInfo params, + DestReceiver *dest); +extern void ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params, + DestReceiver *dest); #endif /* MULTI_EXECUTOR_H */