From cbecf97c84d04786815164f85225657540806023 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Tue, 27 Aug 2019 09:11:08 +0200 Subject: [PATCH] Move tuplestore setup to a helper function (#2898) * Add tuplestore helpers * More detailed error messages in tuplestore * Add CreateTupleDescCopy to SetupTuplestore * Use new SetupTuplestore helper function * Remove unnecessary copy * Remove comment about undefined behaviour --- .../executor/intermediate_results.c | 58 +------------ .../test/distributed_deadlock_detection.c | 39 +-------- src/backend/distributed/test/progress_utils.c | 36 +------- .../distributed/transaction/backend_data.c | 71 +-------------- .../transaction/citus_dist_stat_activity.c | 39 +-------- .../distributed/transaction/lock_graph.c | 39 +-------- src/backend/distributed/utils/tuplestore.c | 86 +++++++++++++++++++ src/include/distributed/tuplestore.h | 21 +++++ 8 files changed, 123 insertions(+), 266 deletions(-) create mode 100644 src/backend/distributed/utils/tuplestore.c create mode 100644 src/include/distributed/tuplestore.h diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index cec12ce34..50c017044 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -27,6 +27,7 @@ #include "distributed/remote_commands.h" #include "distributed/transmit.h" #include "distributed/transaction_identifier.h" +#include "distributed/tuplestore.h" #include "distributed/version_compat.h" #include "distributed/worker_protocol.h" #include "nodes/makefuncs.h" @@ -705,8 +706,6 @@ IntermediateResultSize(char *resultId) Datum read_intermediate_result(PG_FUNCTION_ARGS) { - ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; - text *resultIdText = PG_GETARG_TEXT_P(0); char *resultIdString = text_to_cstring(resultIdText); Datum copyFormatOidDatum = PG_GETARG_DATUM(1); @@ -719,7 +718,6 @@ read_intermediate_result(PG_FUNCTION_ARGS) Tuplestorestate *tupstore = NULL; TupleDesc tupleDescriptor = NULL; - MemoryContext oldcontext = NULL; CheckCitusVersion(ERROR); @@ -731,59 +729,7 @@ read_intermediate_result(PG_FUNCTION_ARGS) errmsg("result \"%s\" does not exist", resultIdString))); } - /* check to see if query supports us returning a tuplestore */ - if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg( - "set-valued function called in context that cannot accept a set"))); - } - - if (!(rsinfo->allowedModes & SFRM_Materialize)) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg( - "materialize mode required, but it is not allowed in this context"))); - } - - /* get a tuple descriptor for our result type */ - switch (get_call_result_type(fcinfo, NULL, &tupleDescriptor)) - { - case TYPEFUNC_COMPOSITE: - { - /* success */ - break; - } - - case TYPEFUNC_RECORD: - { - /* failed to determine actual type of RECORD */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("function returning record called in context " - "that cannot accept type record"))); - break; - } - - default: - { - /* result type isn't composite */ - elog(ERROR, "return type must be a row type"); - break; - } - } - - tupleDescriptor = CreateTupleDescCopy(tupleDescriptor); - - oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory); - - tupstore = tuplestore_begin_heap(true, false, work_mem); - rsinfo->returnMode = SFRM_Materialize; - rsinfo->setResult = tupstore; - rsinfo->setDesc = tupleDescriptor; - MemoryContextSwitchTo(oldcontext); + tupstore = SetupTuplestore(fcinfo, &tupleDescriptor); ReadFileIntoTupleStore(resultFileName, copyFormatLabel, tupleDescriptor, tupstore); diff --git a/src/backend/distributed/test/distributed_deadlock_detection.c b/src/backend/distributed/test/distributed_deadlock_detection.c index 22f73c6a7..bde1f9f2d 100644 --- a/src/backend/distributed/test/distributed_deadlock_detection.c +++ b/src/backend/distributed/test/distributed_deadlock_detection.c @@ -21,6 +21,7 @@ #include "distributed/lock_graph.h" #include "distributed/metadata_cache.h" #include "distributed/transaction_identifier.h" +#include "distributed/tuplestore.h" #include "nodes/pg_list.h" #include "utils/hsearch.h" #include "utils/timestamp.h" @@ -38,11 +39,8 @@ PG_FUNCTION_INFO_V1(get_adjacency_list_wait_graph); Datum get_adjacency_list_wait_graph(PG_FUNCTION_ARGS) { - ReturnSetInfo *returnSetInfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = NULL; - MemoryContext perQueryContext = NULL; - MemoryContext oldContext = NULL; WaitGraph *waitGraph = NULL; HTAB *adjacencyList = NULL; @@ -54,40 +52,7 @@ get_adjacency_list_wait_graph(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); - /* check to see if caller supports us returning a tuplestore */ - if (returnSetInfo == NULL || !IsA(returnSetInfo, ReturnSetInfo)) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("set-valued function called in context " \ - "that cannot accept a set"))); - } - - if (!(returnSetInfo->allowedModes & SFRM_Materialize)) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("materialize mode required, but it is not " \ - "allowed in this context"))); - } - - /* build a tuple descriptor for our result type */ - if (get_call_result_type(fcinfo, NULL, &tupleDescriptor) != TYPEFUNC_COMPOSITE) - { - elog(ERROR, "return type must be a row type"); - } - - perQueryContext = returnSetInfo->econtext->ecxt_per_query_memory; - - oldContext = MemoryContextSwitchTo(perQueryContext); - - tupleStore = tuplestore_begin_heap(true, false, work_mem); - returnSetInfo->returnMode = SFRM_Materialize; - returnSetInfo->setResult = tupleStore; - returnSetInfo->setDesc = tupleDescriptor; - - MemoryContextSwitchTo(oldContext); - + tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); waitGraph = BuildGlobalWaitGraph(); adjacencyList = BuildAdjacencyListsForWaitGraph(waitGraph); diff --git a/src/backend/distributed/test/progress_utils.c b/src/backend/distributed/test/progress_utils.c index c29c6ac15..426d2fb72 100644 --- a/src/backend/distributed/test/progress_utils.c +++ b/src/backend/distributed/test/progress_utils.c @@ -19,6 +19,7 @@ #include #include "distributed/multi_progress.h" +#include "distributed/tuplestore.h" #include "nodes/execnodes.h" #include "utils/tuplestore.h" @@ -85,40 +86,9 @@ show_progress(PG_FUNCTION_ARGS) uint64 magicNumber = PG_GETARG_INT64(0); List *attachedDSMSegments = NIL; List *monitorList = ProgressMonitorList(magicNumber, &attachedDSMSegments); - Tuplestorestate *tupstore = NULL; - TupleDesc tupdesc; - MemoryContext perQueryContext; - MemoryContext currentContext; - ReturnSetInfo *resultSet = (ReturnSetInfo *) fcinfo->resultinfo; ListCell *monitorCell = NULL; - - /* check to see if caller supports us returning a tuplestore */ - if (resultSet == NULL || !IsA(resultSet, ReturnSetInfo)) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("set-valued function called in context that cannot " \ - "accept a set"))); - } - if (!(resultSet->allowedModes & SFRM_Materialize)) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("materialize mode required, but it is not " \ - "allowed in this context"))); - } - if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) - { - elog(ERROR, "return type must be a row type"); - } - - perQueryContext = resultSet->econtext->ecxt_per_query_memory; - currentContext = MemoryContextSwitchTo(perQueryContext); - tupstore = tuplestore_begin_heap(true, false, work_mem); - resultSet->returnMode = SFRM_Materialize; - resultSet->setResult = tupstore; - resultSet->setDesc = tupdesc; - MemoryContextSwitchTo(currentContext); + TupleDesc tupdesc; + Tuplestorestate *tupstore = SetupTuplestore(fcinfo, &tupdesc); foreach(monitorCell, monitorList) { diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index cbc5d2d14..43ddd877c 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -25,6 +25,7 @@ #include "distributed/metadata_cache.h" #include "distributed/remote_commands.h" #include "distributed/transaction_identifier.h" +#include "distributed/tuplestore.h" #include "nodes/execnodes.h" #include "postmaster/autovacuum.h" /* to access autovacuum_max_workers */ #include "storage/ipc.h" @@ -62,7 +63,6 @@ typedef struct BackendManagementShmemData static void StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor); -static void CheckReturnSetInfo(ReturnSetInfo *returnSetInfo); static shmem_startup_hook_type prev_shmem_startup_hook = NULL; static BackendManagementShmemData *backendManagementShmemData = NULL; @@ -211,11 +211,8 @@ get_current_transaction_id(PG_FUNCTION_ARGS) Datum get_global_active_transactions(PG_FUNCTION_ARGS) { - ReturnSetInfo *returnSetInfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = NULL; - MemoryContext perQueryContext = NULL; - MemoryContext oldContext = NULL; List *workerNodeList = ActivePrimaryNodeList(); ListCell *workerNodeCell = NULL; List *connectionList = NIL; @@ -223,27 +220,10 @@ get_global_active_transactions(PG_FUNCTION_ARGS) StringInfo queryToSend = makeStringInfo(); CheckCitusVersion(ERROR); - CheckReturnSetInfo(returnSetInfo); - - /* build a tuple descriptor for our result type */ - if (get_call_result_type(fcinfo, NULL, &tupleDescriptor) != TYPEFUNC_COMPOSITE) - { - elog(ERROR, "return type must be a row type"); - } + tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); appendStringInfo(queryToSend, GET_ACTIVE_TRANSACTION_QUERY); - perQueryContext = returnSetInfo->econtext->ecxt_per_query_memory; - - oldContext = MemoryContextSwitchTo(perQueryContext); - - tupleStore = tuplestore_begin_heap(true, false, work_mem); - returnSetInfo->returnMode = SFRM_Materialize; - returnSetInfo->setResult = tupleStore; - returnSetInfo->setDesc = tupleDescriptor; - - MemoryContextSwitchTo(oldContext); - /* add active transactions for local node */ StoreAllActiveTransactions(tupleStore, tupleDescriptor); @@ -350,31 +330,12 @@ get_global_active_transactions(PG_FUNCTION_ARGS) Datum get_all_active_transactions(PG_FUNCTION_ARGS) { - ReturnSetInfo *returnSetInfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = NULL; - MemoryContext perQueryContext = NULL; - MemoryContext oldContext = NULL; CheckCitusVersion(ERROR); - CheckReturnSetInfo(returnSetInfo); + tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); - /* build a tuple descriptor for our result type */ - if (get_call_result_type(fcinfo, NULL, &tupleDescriptor) != TYPEFUNC_COMPOSITE) - { - elog(ERROR, "return type must be a row type"); - } - - perQueryContext = returnSetInfo->econtext->ecxt_per_query_memory; - - oldContext = MemoryContextSwitchTo(perQueryContext); - - tupleStore = tuplestore_begin_heap(true, false, work_mem); - returnSetInfo->returnMode = SFRM_Materialize; - returnSetInfo->setResult = tupleStore; - returnSetInfo->setDesc = tupleDescriptor; - - MemoryContextSwitchTo(oldContext); StoreAllActiveTransactions(tupleStore, tupleDescriptor); /* clean up and return the tuplestore */ @@ -486,32 +447,6 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto } -/* - * CheckReturnSetInfo checks whether the defined given returnSetInfo is - * proper for returning tuplestore. - */ -static void -CheckReturnSetInfo(ReturnSetInfo *returnSetInfo) -{ - /* check to see if caller supports us returning a tuplestore */ - if (returnSetInfo == NULL || !IsA(returnSetInfo, ReturnSetInfo)) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("set-valued function called in context " \ - "that cannot accept a set"))); - } - - if (!(returnSetInfo->allowedModes & SFRM_Materialize)) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("materialize mode required, but it is not " \ - "allowed in this context"))); - } -} - - /* * InitializeBackendManagement requests the necessary shared memory * from Postgres and sets up the shared memory startup hook. diff --git a/src/backend/distributed/transaction/citus_dist_stat_activity.c b/src/backend/distributed/transaction/citus_dist_stat_activity.c index 03157c4bd..9b93a8780 100644 --- a/src/backend/distributed/transaction/citus_dist_stat_activity.c +++ b/src/backend/distributed/transaction/citus_dist_stat_activity.c @@ -28,6 +28,7 @@ #include "distributed/metadata_cache.h" #include "distributed/remote_commands.h" #include "distributed/transaction_identifier.h" +#include "distributed/tuplestore.h" #include "executor/spi.h" #include "nodes/execnodes.h" #include "storage/ipc.h" @@ -974,44 +975,10 @@ ParseXIDField(PGresult *result, int rowIndex, int colIndex) static void ReturnCitusDistStats(List *citusStatsList, FunctionCallInfo fcinfo) { - ReturnSetInfo *resultInfo = (ReturnSetInfo *) fcinfo->resultinfo; - TupleDesc tupleDesc = NULL; - Tuplestorestate *tupleStore = NULL; - MemoryContext per_query_ctx = NULL; - MemoryContext oldContext = NULL; - ListCell *citusStatsCell = NULL; - /* check to see if caller supports us returning a tuplestore */ - if (resultInfo == NULL || !IsA(resultInfo, ReturnSetInfo)) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg( - "set-valued function called in context that cannot accept a set"))); - } - if (!(resultInfo->allowedModes & SFRM_Materialize)) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("materialize mode required, but it is not " \ - "allowed in this context"))); - } - - /* Build a tuple descriptor for our result type */ - if (get_call_result_type(fcinfo, NULL, &tupleDesc) != TYPEFUNC_COMPOSITE) - { - elog(ERROR, "return type must be a row type"); - } - - per_query_ctx = resultInfo->econtext->ecxt_per_query_memory; - oldContext = MemoryContextSwitchTo(per_query_ctx); - - tupleStore = tuplestore_begin_heap(true, false, work_mem); - resultInfo->returnMode = SFRM_Materialize; - resultInfo->setResult = tupleStore; - resultInfo->setDesc = tupleDesc; - MemoryContextSwitchTo(oldContext); + TupleDesc tupleDesc; + Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDesc); foreach(citusStatsCell, citusStatsList) { diff --git a/src/backend/distributed/transaction/lock_graph.c b/src/backend/distributed/transaction/lock_graph.c index cee26d0fb..9c32189fc 100644 --- a/src/backend/distributed/transaction/lock_graph.c +++ b/src/backend/distributed/transaction/lock_graph.c @@ -24,6 +24,7 @@ #include "distributed/lock_graph.h" #include "distributed/metadata_cache.h" #include "distributed/remote_commands.h" +#include "distributed/tuplestore.h" #include "storage/proc.h" #include "utils/builtins.h" #include "utils/hsearch.h" @@ -285,43 +286,9 @@ dump_local_wait_edges(PG_FUNCTION_ARGS) static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo) { - ReturnSetInfo *resultInfo = (ReturnSetInfo *) fcinfo->resultinfo; - TupleDesc tupleDesc = NULL; - Tuplestorestate *tupleStore = NULL; - MemoryContext per_query_ctx = NULL; - MemoryContext oldContext = NULL; size_t curEdgeNum = 0; - - /* check to see if caller supports us returning a tuplestore */ - if (resultInfo == NULL || !IsA(resultInfo, ReturnSetInfo)) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg( - "set-valued function called in context that cannot accept a set"))); - } - if (!(resultInfo->allowedModes & SFRM_Materialize)) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("materialize mode required, but it is not " \ - "allowed in this context"))); - } - - /* Build a tuple descriptor for our result type */ - if (get_call_result_type(fcinfo, NULL, &tupleDesc) != TYPEFUNC_COMPOSITE) - { - elog(ERROR, "return type must be a row type"); - } - - per_query_ctx = resultInfo->econtext->ecxt_per_query_memory; - oldContext = MemoryContextSwitchTo(per_query_ctx); - - tupleStore = tuplestore_begin_heap(true, false, work_mem); - resultInfo->returnMode = SFRM_Materialize; - resultInfo->setResult = tupleStore; - resultInfo->setDesc = tupleDesc; - MemoryContextSwitchTo(oldContext); + TupleDesc tupleDesc; + Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDesc); /* * Columns: diff --git a/src/backend/distributed/utils/tuplestore.c b/src/backend/distributed/utils/tuplestore.c new file mode 100644 index 000000000..430a92c47 --- /dev/null +++ b/src/backend/distributed/utils/tuplestore.c @@ -0,0 +1,86 @@ +/*------------------------------------------------------------------------- + * + * tuplestore.c + * Utilities regarding calls to PG functions + * + * Copyright (c) 2012-2018, Citus Data, Inc. + *------------------------------------------------------------------------- + */ + + +#include "postgres.h" + +#include "distributed/tuplestore.h" +#include "miscadmin.h" + +/* + * CheckTuplestoreReturn checks if a tuplestore can be returned in the callsite + * of the UDF. + */ +ReturnSetInfo * +CheckTuplestoreReturn(FunctionCallInfo fcinfo, TupleDesc *tupdesc) +{ + ReturnSetInfo *returnSetInfo = (ReturnSetInfo *) fcinfo->resultinfo; + + /* check to see if caller supports us returning a tuplestore */ + if (returnSetInfo == NULL || !IsA(returnSetInfo, ReturnSetInfo)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot " \ + "accept a set"))); + } + if (!(returnSetInfo->allowedModes & SFRM_Materialize)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not " \ + "allowed in this context"))); + } + switch (get_call_result_type(fcinfo, NULL, tupdesc)) + { + case TYPEFUNC_COMPOSITE: + { + /* success */ + break; + } + + case TYPEFUNC_RECORD: + { + /* failed to determine actual type of RECORD */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context " + "that cannot accept type record"))); + break; + } + + default: + { + /* result type isn't composite */ + elog(ERROR, "return type must be a row type"); + break; + } + } + return returnSetInfo; +} + + +/* + * SetupTuplestore sets up a tuplestore for returning data. + */ +Tuplestorestate * +SetupTuplestore(FunctionCallInfo fcinfo, TupleDesc *tupleDescriptor) +{ + ReturnSetInfo *resultSet = CheckTuplestoreReturn(fcinfo, tupleDescriptor); + MemoryContext perQueryContext = resultSet->econtext->ecxt_per_query_memory; + + MemoryContext oldContext = MemoryContextSwitchTo(perQueryContext); + Tuplestorestate *tupstore = tuplestore_begin_heap(true, false, work_mem); + resultSet->returnMode = SFRM_Materialize; + resultSet->setResult = tupstore; + resultSet->setDesc = *tupleDescriptor; + MemoryContextSwitchTo(oldContext); + + return tupstore; +} diff --git a/src/include/distributed/tuplestore.h b/src/include/distributed/tuplestore.h new file mode 100644 index 000000000..40d5e7621 --- /dev/null +++ b/src/include/distributed/tuplestore.h @@ -0,0 +1,21 @@ +/*------------------------------------------------------------------------- + * + * tuplestore.h + * Utilities regarding calls to PG functions + * + * Copyright (c) 2012-2018, Citus Data, Inc. + *------------------------------------------------------------------------- + */ + + +#ifndef CITUS_TUPLESTORE_H +#define CITUS_TUPLESTORE_H +#include "funcapi.h" + +/* Function declaration for getting oid for the given function name */ +extern +ReturnSetInfo * CheckTuplestoreReturn(FunctionCallInfo fcinfo, TupleDesc *tupdesc); + +extern +Tuplestorestate * SetupTuplestore(FunctionCallInfo fcinfo, TupleDesc *tupdesc); +#endif