Move tuplestore setup to a helper function (#2898)

* Add tuplestore helpers

* More detailed error messages in tuplestore

* Add CreateTupleDescCopy to SetupTuplestore

* Use new SetupTuplestore helper function

* Remove unnecessary copy

* Remove comment about undefined behaviour
pull/2910/head
Jelte Fennema 2019-08-27 09:11:08 +02:00 committed by GitHub
parent b354644c56
commit cbecf97c84
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 123 additions and 266 deletions

View File

@ -27,6 +27,7 @@
#include "distributed/remote_commands.h"
#include "distributed/transmit.h"
#include "distributed/transaction_identifier.h"
#include "distributed/tuplestore.h"
#include "distributed/version_compat.h"
#include "distributed/worker_protocol.h"
#include "nodes/makefuncs.h"
@ -705,8 +706,6 @@ IntermediateResultSize(char *resultId)
Datum
read_intermediate_result(PG_FUNCTION_ARGS)
{
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
text *resultIdText = PG_GETARG_TEXT_P(0);
char *resultIdString = text_to_cstring(resultIdText);
Datum copyFormatOidDatum = PG_GETARG_DATUM(1);
@ -719,7 +718,6 @@ read_intermediate_result(PG_FUNCTION_ARGS)
Tuplestorestate *tupstore = NULL;
TupleDesc tupleDescriptor = NULL;
MemoryContext oldcontext = NULL;
CheckCitusVersion(ERROR);
@ -731,59 +729,7 @@ read_intermediate_result(PG_FUNCTION_ARGS)
errmsg("result \"%s\" does not exist", resultIdString)));
}
/* check to see if query supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"set-valued function called in context that cannot accept a set")));
}
if (!(rsinfo->allowedModes & SFRM_Materialize))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"materialize mode required, but it is not allowed in this context")));
}
/* get a tuple descriptor for our result type */
switch (get_call_result_type(fcinfo, NULL, &tupleDescriptor))
{
case TYPEFUNC_COMPOSITE:
{
/* success */
break;
}
case TYPEFUNC_RECORD:
{
/* failed to determine actual type of RECORD */
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("function returning record called in context "
"that cannot accept type record")));
break;
}
default:
{
/* result type isn't composite */
elog(ERROR, "return type must be a row type");
break;
}
}
tupleDescriptor = CreateTupleDescCopy(tupleDescriptor);
oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
tupstore = tuplestore_begin_heap(true, false, work_mem);
rsinfo->returnMode = SFRM_Materialize;
rsinfo->setResult = tupstore;
rsinfo->setDesc = tupleDescriptor;
MemoryContextSwitchTo(oldcontext);
tupstore = SetupTuplestore(fcinfo, &tupleDescriptor);
ReadFileIntoTupleStore(resultFileName, copyFormatLabel, tupleDescriptor, tupstore);

View File

@ -21,6 +21,7 @@
#include "distributed/lock_graph.h"
#include "distributed/metadata_cache.h"
#include "distributed/transaction_identifier.h"
#include "distributed/tuplestore.h"
#include "nodes/pg_list.h"
#include "utils/hsearch.h"
#include "utils/timestamp.h"
@ -38,11 +39,8 @@ PG_FUNCTION_INFO_V1(get_adjacency_list_wait_graph);
Datum
get_adjacency_list_wait_graph(PG_FUNCTION_ARGS)
{
ReturnSetInfo *returnSetInfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = NULL;
MemoryContext perQueryContext = NULL;
MemoryContext oldContext = NULL;
WaitGraph *waitGraph = NULL;
HTAB *adjacencyList = NULL;
@ -54,40 +52,7 @@ get_adjacency_list_wait_graph(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR);
/* check to see if caller supports us returning a tuplestore */
if (returnSetInfo == NULL || !IsA(returnSetInfo, ReturnSetInfo))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("set-valued function called in context " \
"that cannot accept a set")));
}
if (!(returnSetInfo->allowedModes & SFRM_Materialize))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("materialize mode required, but it is not " \
"allowed in this context")));
}
/* build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupleDescriptor) != TYPEFUNC_COMPOSITE)
{
elog(ERROR, "return type must be a row type");
}
perQueryContext = returnSetInfo->econtext->ecxt_per_query_memory;
oldContext = MemoryContextSwitchTo(perQueryContext);
tupleStore = tuplestore_begin_heap(true, false, work_mem);
returnSetInfo->returnMode = SFRM_Materialize;
returnSetInfo->setResult = tupleStore;
returnSetInfo->setDesc = tupleDescriptor;
MemoryContextSwitchTo(oldContext);
tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
waitGraph = BuildGlobalWaitGraph();
adjacencyList = BuildAdjacencyListsForWaitGraph(waitGraph);

View File

@ -19,6 +19,7 @@
#include <unistd.h>
#include "distributed/multi_progress.h"
#include "distributed/tuplestore.h"
#include "nodes/execnodes.h"
#include "utils/tuplestore.h"
@ -85,40 +86,9 @@ show_progress(PG_FUNCTION_ARGS)
uint64 magicNumber = PG_GETARG_INT64(0);
List *attachedDSMSegments = NIL;
List *monitorList = ProgressMonitorList(magicNumber, &attachedDSMSegments);
Tuplestorestate *tupstore = NULL;
TupleDesc tupdesc;
MemoryContext perQueryContext;
MemoryContext currentContext;
ReturnSetInfo *resultSet = (ReturnSetInfo *) fcinfo->resultinfo;
ListCell *monitorCell = NULL;
/* check to see if caller supports us returning a tuplestore */
if (resultSet == NULL || !IsA(resultSet, ReturnSetInfo))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("set-valued function called in context that cannot " \
"accept a set")));
}
if (!(resultSet->allowedModes & SFRM_Materialize))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("materialize mode required, but it is not " \
"allowed in this context")));
}
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
{
elog(ERROR, "return type must be a row type");
}
perQueryContext = resultSet->econtext->ecxt_per_query_memory;
currentContext = MemoryContextSwitchTo(perQueryContext);
tupstore = tuplestore_begin_heap(true, false, work_mem);
resultSet->returnMode = SFRM_Materialize;
resultSet->setResult = tupstore;
resultSet->setDesc = tupdesc;
MemoryContextSwitchTo(currentContext);
TupleDesc tupdesc;
Tuplestorestate *tupstore = SetupTuplestore(fcinfo, &tupdesc);
foreach(monitorCell, monitorList)
{

View File

@ -25,6 +25,7 @@
#include "distributed/metadata_cache.h"
#include "distributed/remote_commands.h"
#include "distributed/transaction_identifier.h"
#include "distributed/tuplestore.h"
#include "nodes/execnodes.h"
#include "postmaster/autovacuum.h" /* to access autovacuum_max_workers */
#include "storage/ipc.h"
@ -62,7 +63,6 @@ typedef struct BackendManagementShmemData
static void StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc
tupleDescriptor);
static void CheckReturnSetInfo(ReturnSetInfo *returnSetInfo);
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static BackendManagementShmemData *backendManagementShmemData = NULL;
@ -211,11 +211,8 @@ get_current_transaction_id(PG_FUNCTION_ARGS)
Datum
get_global_active_transactions(PG_FUNCTION_ARGS)
{
ReturnSetInfo *returnSetInfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = NULL;
MemoryContext perQueryContext = NULL;
MemoryContext oldContext = NULL;
List *workerNodeList = ActivePrimaryNodeList();
ListCell *workerNodeCell = NULL;
List *connectionList = NIL;
@ -223,27 +220,10 @@ get_global_active_transactions(PG_FUNCTION_ARGS)
StringInfo queryToSend = makeStringInfo();
CheckCitusVersion(ERROR);
CheckReturnSetInfo(returnSetInfo);
/* build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupleDescriptor) != TYPEFUNC_COMPOSITE)
{
elog(ERROR, "return type must be a row type");
}
tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
appendStringInfo(queryToSend, GET_ACTIVE_TRANSACTION_QUERY);
perQueryContext = returnSetInfo->econtext->ecxt_per_query_memory;
oldContext = MemoryContextSwitchTo(perQueryContext);
tupleStore = tuplestore_begin_heap(true, false, work_mem);
returnSetInfo->returnMode = SFRM_Materialize;
returnSetInfo->setResult = tupleStore;
returnSetInfo->setDesc = tupleDescriptor;
MemoryContextSwitchTo(oldContext);
/* add active transactions for local node */
StoreAllActiveTransactions(tupleStore, tupleDescriptor);
@ -350,31 +330,12 @@ get_global_active_transactions(PG_FUNCTION_ARGS)
Datum
get_all_active_transactions(PG_FUNCTION_ARGS)
{
ReturnSetInfo *returnSetInfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = NULL;
MemoryContext perQueryContext = NULL;
MemoryContext oldContext = NULL;
CheckCitusVersion(ERROR);
CheckReturnSetInfo(returnSetInfo);
tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
/* build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupleDescriptor) != TYPEFUNC_COMPOSITE)
{
elog(ERROR, "return type must be a row type");
}
perQueryContext = returnSetInfo->econtext->ecxt_per_query_memory;
oldContext = MemoryContextSwitchTo(perQueryContext);
tupleStore = tuplestore_begin_heap(true, false, work_mem);
returnSetInfo->returnMode = SFRM_Materialize;
returnSetInfo->setResult = tupleStore;
returnSetInfo->setDesc = tupleDescriptor;
MemoryContextSwitchTo(oldContext);
StoreAllActiveTransactions(tupleStore, tupleDescriptor);
/* clean up and return the tuplestore */
@ -486,32 +447,6 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto
}
/*
* CheckReturnSetInfo checks whether the defined given returnSetInfo is
* proper for returning tuplestore.
*/
static void
CheckReturnSetInfo(ReturnSetInfo *returnSetInfo)
{
/* check to see if caller supports us returning a tuplestore */
if (returnSetInfo == NULL || !IsA(returnSetInfo, ReturnSetInfo))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("set-valued function called in context " \
"that cannot accept a set")));
}
if (!(returnSetInfo->allowedModes & SFRM_Materialize))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("materialize mode required, but it is not " \
"allowed in this context")));
}
}
/*
* InitializeBackendManagement requests the necessary shared memory
* from Postgres and sets up the shared memory startup hook.

View File

@ -28,6 +28,7 @@
#include "distributed/metadata_cache.h"
#include "distributed/remote_commands.h"
#include "distributed/transaction_identifier.h"
#include "distributed/tuplestore.h"
#include "executor/spi.h"
#include "nodes/execnodes.h"
#include "storage/ipc.h"
@ -974,44 +975,10 @@ ParseXIDField(PGresult *result, int rowIndex, int colIndex)
static void
ReturnCitusDistStats(List *citusStatsList, FunctionCallInfo fcinfo)
{
ReturnSetInfo *resultInfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupleDesc = NULL;
Tuplestorestate *tupleStore = NULL;
MemoryContext per_query_ctx = NULL;
MemoryContext oldContext = NULL;
ListCell *citusStatsCell = NULL;
/* check to see if caller supports us returning a tuplestore */
if (resultInfo == NULL || !IsA(resultInfo, ReturnSetInfo))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"set-valued function called in context that cannot accept a set")));
}
if (!(resultInfo->allowedModes & SFRM_Materialize))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("materialize mode required, but it is not " \
"allowed in this context")));
}
/* Build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupleDesc) != TYPEFUNC_COMPOSITE)
{
elog(ERROR, "return type must be a row type");
}
per_query_ctx = resultInfo->econtext->ecxt_per_query_memory;
oldContext = MemoryContextSwitchTo(per_query_ctx);
tupleStore = tuplestore_begin_heap(true, false, work_mem);
resultInfo->returnMode = SFRM_Materialize;
resultInfo->setResult = tupleStore;
resultInfo->setDesc = tupleDesc;
MemoryContextSwitchTo(oldContext);
TupleDesc tupleDesc;
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDesc);
foreach(citusStatsCell, citusStatsList)
{

View File

@ -24,6 +24,7 @@
#include "distributed/lock_graph.h"
#include "distributed/metadata_cache.h"
#include "distributed/remote_commands.h"
#include "distributed/tuplestore.h"
#include "storage/proc.h"
#include "utils/builtins.h"
#include "utils/hsearch.h"
@ -285,43 +286,9 @@ dump_local_wait_edges(PG_FUNCTION_ARGS)
static void
ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo)
{
ReturnSetInfo *resultInfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupleDesc = NULL;
Tuplestorestate *tupleStore = NULL;
MemoryContext per_query_ctx = NULL;
MemoryContext oldContext = NULL;
size_t curEdgeNum = 0;
/* check to see if caller supports us returning a tuplestore */
if (resultInfo == NULL || !IsA(resultInfo, ReturnSetInfo))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"set-valued function called in context that cannot accept a set")));
}
if (!(resultInfo->allowedModes & SFRM_Materialize))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("materialize mode required, but it is not " \
"allowed in this context")));
}
/* Build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupleDesc) != TYPEFUNC_COMPOSITE)
{
elog(ERROR, "return type must be a row type");
}
per_query_ctx = resultInfo->econtext->ecxt_per_query_memory;
oldContext = MemoryContextSwitchTo(per_query_ctx);
tupleStore = tuplestore_begin_heap(true, false, work_mem);
resultInfo->returnMode = SFRM_Materialize;
resultInfo->setResult = tupleStore;
resultInfo->setDesc = tupleDesc;
MemoryContextSwitchTo(oldContext);
TupleDesc tupleDesc;
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDesc);
/*
* Columns:

View File

@ -0,0 +1,86 @@
/*-------------------------------------------------------------------------
*
* tuplestore.c
* Utilities regarding calls to PG functions
*
* Copyright (c) 2012-2018, Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/tuplestore.h"
#include "miscadmin.h"
/*
* CheckTuplestoreReturn checks if a tuplestore can be returned in the callsite
* of the UDF.
*/
ReturnSetInfo *
CheckTuplestoreReturn(FunctionCallInfo fcinfo, TupleDesc *tupdesc)
{
ReturnSetInfo *returnSetInfo = (ReturnSetInfo *) fcinfo->resultinfo;
/* check to see if caller supports us returning a tuplestore */
if (returnSetInfo == NULL || !IsA(returnSetInfo, ReturnSetInfo))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("set-valued function called in context that cannot " \
"accept a set")));
}
if (!(returnSetInfo->allowedModes & SFRM_Materialize))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("materialize mode required, but it is not " \
"allowed in this context")));
}
switch (get_call_result_type(fcinfo, NULL, tupdesc))
{
case TYPEFUNC_COMPOSITE:
{
/* success */
break;
}
case TYPEFUNC_RECORD:
{
/* failed to determine actual type of RECORD */
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("function returning record called in context "
"that cannot accept type record")));
break;
}
default:
{
/* result type isn't composite */
elog(ERROR, "return type must be a row type");
break;
}
}
return returnSetInfo;
}
/*
* SetupTuplestore sets up a tuplestore for returning data.
*/
Tuplestorestate *
SetupTuplestore(FunctionCallInfo fcinfo, TupleDesc *tupleDescriptor)
{
ReturnSetInfo *resultSet = CheckTuplestoreReturn(fcinfo, tupleDescriptor);
MemoryContext perQueryContext = resultSet->econtext->ecxt_per_query_memory;
MemoryContext oldContext = MemoryContextSwitchTo(perQueryContext);
Tuplestorestate *tupstore = tuplestore_begin_heap(true, false, work_mem);
resultSet->returnMode = SFRM_Materialize;
resultSet->setResult = tupstore;
resultSet->setDesc = *tupleDescriptor;
MemoryContextSwitchTo(oldContext);
return tupstore;
}

View File

@ -0,0 +1,21 @@
/*-------------------------------------------------------------------------
*
* tuplestore.h
* Utilities regarding calls to PG functions
*
* Copyright (c) 2012-2018, Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#ifndef CITUS_TUPLESTORE_H
#define CITUS_TUPLESTORE_H
#include "funcapi.h"
/* Function declaration for getting oid for the given function name */
extern
ReturnSetInfo * CheckTuplestoreReturn(FunctionCallInfo fcinfo, TupleDesc *tupdesc);
extern
Tuplestorestate * SetupTuplestore(FunctionCallInfo fcinfo, TupleDesc *tupdesc);
#endif