From ef487e07921a3957bb35a2d030f8dd3497b70f79 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Mon, 16 Dec 2019 16:45:12 -0800 Subject: [PATCH] Implement fetch_intermediate_results --- src/backend/distributed/commands/multi_copy.c | 9 +- .../executor/intermediate_results.c | 243 ++++++++++++++++++ .../distributed/sql/citus--9.1-1--9.2-1.sql | 1 + .../udfs/fetch_intermediate_results/9.2-1.sql | 9 + .../fetch_intermediate_results/latest.sql | 9 + .../distributed/test/intermediate_results.c | 70 +++++ .../distributed/intermediate_results.h | 1 + src/test/regress/bin/normalize.sed | 3 + .../regress/expected/intermediate_results.out | 126 +++++++++ src/test/regress/sql/intermediate_results.sql | 51 ++++ 10 files changed, 521 insertions(+), 1 deletion(-) create mode 100644 src/backend/distributed/sql/udfs/fetch_intermediate_results/9.2-1.sql create mode 100644 src/backend/distributed/sql/udfs/fetch_intermediate_results/latest.sql create mode 100644 src/backend/distributed/test/intermediate_results.c diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 1e7ee862d..6716f2505 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -2735,7 +2735,14 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryS { const char *resultId = copyStatement->relation->relname; - ReceiveQueryResultViaCopy(resultId); + if (copyStatement->is_from) + { + ReceiveQueryResultViaCopy(resultId); + } + else + { + SendQueryResultViaCopy(resultId); + } return NULL; } diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index 688eef466..b8d3c2c62 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -23,6 +23,7 @@ #include "distributed/intermediate_results.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" +#include "distributed/multi_client_executor.h" #include "distributed/multi_executor.h" #include "distributed/remote_commands.h" #include "distributed/transmit.h" @@ -96,12 +97,17 @@ static void ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, char *copyFormat, Datum *resultIdArray, int resultCount); +static uint64 FetchRemoteIntermediateResult(MultiConnection *connection, char *resultId); +static CopyStatus CopyDataFromConnection(MultiConnection *connection, + FileCompat *fileCompat, + uint64 *bytesReceived); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(read_intermediate_result); PG_FUNCTION_INFO_V1(read_intermediate_result_array); PG_FUNCTION_INFO_V1(broadcast_intermediate_result); PG_FUNCTION_INFO_V1(create_intermediate_result); +PG_FUNCTION_INFO_V1(fetch_intermediate_results); /* @@ -511,6 +517,20 @@ RemoteFileDestReceiverDestroy(DestReceiver *destReceiver) } +/* + * SendQueryResultViaCopy is called when a COPY "resultid" TO STDOUT + * WITH (format result) command is received from the client. The + * contents of the file are sent directly to the client. + */ +void +SendQueryResultViaCopy(const char *resultId) +{ + const char *resultFileName = QueryResultFileName(resultId); + + SendRegularFile(resultFileName); +} + + /* * ReceiveQueryResultViaCopy is called when a COPY "resultid" FROM * STDIN WITH (format result) command is received from the client. @@ -769,3 +789,226 @@ ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, char *copyFormat, tuplestore_donestoring(tupleStore); } + + +/* + * fetch_intermediate_results fetches a set of intermediate results defined in an + * array of result IDs from a remote node and writes them to a local intermediate + * result with the same ID. + */ +Datum +fetch_intermediate_results(PG_FUNCTION_ARGS) +{ + ArrayType *resultIdObject = PG_GETARG_ARRAYTYPE_P(0); + Datum *resultIdArray = DeconstructArrayObject(resultIdObject); + int32 resultCount = ArrayObjectCount(resultIdObject); + text *remoteHostText = PG_GETARG_TEXT_P(1); + char *remoteHost = text_to_cstring(remoteHostText); + int remotePort = PG_GETARG_INT32(2); + + int connectionFlags = 0; + int resultIndex = 0; + int64 totalBytesWritten = 0L; + + CheckCitusVersion(ERROR); + + if (resultCount == 0) + { + PG_RETURN_INT64(0); + } + + if (!IsMultiStatementTransaction()) + { + ereport(ERROR, (errmsg("fetch_intermediate_results can only be used in a " + "distributed transaction"))); + } + + /* + * Make sure that this transaction has a distributed transaction ID. + * + * Intermediate results will be stored in a directory that is derived + * from the distributed transaction ID. + */ + UseCoordinatedTransaction(); + + MultiConnection *connection = GetNodeConnection(connectionFlags, remoteHost, + remotePort); + + if (PQstatus(connection->pgConn) != CONNECTION_OK) + { + ereport(ERROR, (errmsg("cannot connect to %s:%d to fetch intermediate " + "results", + remoteHost, remotePort))); + } + + RemoteTransactionBegin(connection); + + for (resultIndex = 0; resultIndex < resultCount; resultIndex++) + { + char *resultId = TextDatumGetCString(resultIdArray[resultIndex]); + + totalBytesWritten += FetchRemoteIntermediateResult(connection, resultId); + } + + RemoteTransactionCommit(connection); + CloseConnection(connection); + + PG_RETURN_INT64(totalBytesWritten); +} + + +/* + * FetchRemoteIntermediateResult fetches a remote intermediate result over + * the given connection. + */ +static uint64 +FetchRemoteIntermediateResult(MultiConnection *connection, char *resultId) +{ + uint64 totalBytesWritten = 0; + + StringInfo copyCommand = makeStringInfo(); + const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY); + const int fileMode = (S_IRUSR | S_IWUSR); + + PGconn *pgConn = connection->pgConn; + int socket = PQsocket(pgConn); + bool raiseErrors = true; + + CreateIntermediateResultsDirectory(); + + appendStringInfo(copyCommand, "COPY \"%s\" TO STDOUT WITH (format result)", + resultId); + + if (!SendRemoteCommand(connection, copyCommand->data)) + { + ReportConnectionError(connection, ERROR); + } + + PGresult *result = GetRemoteCommandResult(connection, raiseErrors); + if (PQresultStatus(result) != PGRES_COPY_OUT) + { + ReportResultError(connection, result, ERROR); + } + + PQclear(result); + + char *localPath = QueryResultFileName(resultId); + File fileDesc = FileOpenForTransmit(localPath, fileFlags, fileMode); + FileCompat fileCompat = FileCompatFromFileStart(fileDesc); + + while (true) + { + int waitFlags = WL_SOCKET_READABLE; + + CopyStatus copyStatus = CopyDataFromConnection(connection, &fileCompat, + &totalBytesWritten); + if (copyStatus == CLIENT_COPY_FAILED) + { + ereport(ERROR, (errmsg("failed to read result \"%s\" from node %s:%d", + resultId, connection->hostname, connection->port))); + } + else if (copyStatus == CLIENT_COPY_DONE) + { + break; + } + + Assert(copyStatus == CLIENT_COPY_MORE); + + int rc = WaitLatchOrSocket(MyLatch, waitFlags, socket, 0, PG_WAIT_EXTENSION); + if (rc & WL_POSTMASTER_DEATH) + { + ereport(ERROR, (errmsg("postmaster was shut down, exiting"))); + } + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + } + + FileClose(fileDesc); + + ClearResults(connection, raiseErrors); + + return totalBytesWritten; +} + + +/* + * CopyDataFromConnection reads a row of copy data from connection and writes it + * to the given file. + */ +static CopyStatus +CopyDataFromConnection(MultiConnection *connection, FileCompat *fileCompat, + uint64 *bytesReceived) +{ + /* + * Consume input to handle the case where previous copy operation might have + * received zero bytes. + */ + int consumed = PQconsumeInput(connection->pgConn); + if (consumed == 0) + { + return CLIENT_COPY_FAILED; + } + + /* receive copy data message in an asynchronous manner */ + char *receiveBuffer = NULL; + bool asynchronous = true; + int receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, asynchronous); + while (receiveLength > 0) + { + /* received copy data; append these data to file */ + errno = 0; + + int bytesWritten = FileWriteCompat(fileCompat, receiveBuffer, + receiveLength, PG_WAIT_IO); + if (bytesWritten != receiveLength) + { + ereport(ERROR, (errcode_for_file_access(), + errmsg("could not append to file: %m"))); + } + + *bytesReceived += receiveLength; + PQfreemem(receiveBuffer); + receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, asynchronous); + } + + if (receiveLength == 0) + { + /* we cannot read more data without blocking */ + return CLIENT_COPY_MORE; + } + else if (receiveLength == -1) + { + /* received copy done message */ + bool raiseInterrupts = true; + PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts); + ExecStatusType resultStatus = PQresultStatus(result); + CopyStatus copyStatus = 0; + + if (resultStatus == PGRES_COMMAND_OK) + { + copyStatus = CLIENT_COPY_DONE; + } + else + { + copyStatus = CLIENT_COPY_FAILED; + + ReportResultError(connection, result, WARNING); + } + + PQclear(result); + ForgetResults(connection); + + return copyStatus; + } + else + { + Assert(receiveLength == -2); + ReportConnectionError(connection, WARNING); + + return CLIENT_COPY_FAILED; + } +} diff --git a/src/backend/distributed/sql/citus--9.1-1--9.2-1.sql b/src/backend/distributed/sql/citus--9.1-1--9.2-1.sql index 4149f26f8..5cdc5bf22 100644 --- a/src/backend/distributed/sql/citus--9.1-1--9.2-1.sql +++ b/src/backend/distributed/sql/citus--9.1-1--9.2-1.sql @@ -1,4 +1,5 @@ #include "udfs/read_intermediate_results/9.2-1.sql" +#include "udfs/fetch_intermediate_results/9.2-1.sql" ALTER TABLE pg_catalog.pg_dist_colocation ADD distributioncolumncollation oid; UPDATE pg_catalog.pg_dist_colocation dc SET distributioncolumncollation = t.typcollation diff --git a/src/backend/distributed/sql/udfs/fetch_intermediate_results/9.2-1.sql b/src/backend/distributed/sql/udfs/fetch_intermediate_results/9.2-1.sql new file mode 100644 index 000000000..ca4e85481 --- /dev/null +++ b/src/backend/distributed/sql/udfs/fetch_intermediate_results/9.2-1.sql @@ -0,0 +1,9 @@ +CREATE OR REPLACE FUNCTION pg_catalog.fetch_intermediate_results( + result_ids text[], + node_name text, + node_port int) +RETURNS bigint +LANGUAGE C STRICT VOLATILE +AS 'MODULE_PATHNAME', $$fetch_intermediate_results$$; +COMMENT ON FUNCTION pg_catalog.fetch_intermediate_results(text[],text,int) +IS 'fetch array of intermediate results from a remote node. returns number of bytes read.'; diff --git a/src/backend/distributed/sql/udfs/fetch_intermediate_results/latest.sql b/src/backend/distributed/sql/udfs/fetch_intermediate_results/latest.sql new file mode 100644 index 000000000..ca4e85481 --- /dev/null +++ b/src/backend/distributed/sql/udfs/fetch_intermediate_results/latest.sql @@ -0,0 +1,9 @@ +CREATE OR REPLACE FUNCTION pg_catalog.fetch_intermediate_results( + result_ids text[], + node_name text, + node_port int) +RETURNS bigint +LANGUAGE C STRICT VOLATILE +AS 'MODULE_PATHNAME', $$fetch_intermediate_results$$; +COMMENT ON FUNCTION pg_catalog.fetch_intermediate_results(text[],text,int) +IS 'fetch array of intermediate results from a remote node. returns number of bytes read.'; diff --git a/src/backend/distributed/test/intermediate_results.c b/src/backend/distributed/test/intermediate_results.c new file mode 100644 index 000000000..13f1e1b9f --- /dev/null +++ b/src/backend/distributed/test/intermediate_results.c @@ -0,0 +1,70 @@ +/*------------------------------------------------------------------------- + * + * test/src/intermediate_results.c + * + * This file contains functions to test functions related to + * src/backend/distributed/executor/intermediate_results.c. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ +#include +#include + +#include "postgres.h" +#include "funcapi.h" +#include "libpq-fe.h" +#include "miscadmin.h" +#include "pgstat.h" + +#include "distributed/commands/multi_copy.h" +#include "distributed/connection_management.h" +#include "distributed/intermediate_results.h" +#include "distributed/multi_executor.h" +#include "distributed/remote_commands.h" + +PG_FUNCTION_INFO_V1(store_intermediate_result_on_node); + + +/* + * store_intermediate_result_on_node executes a query and streams the results + * into a file on the given node. + */ +Datum +store_intermediate_result_on_node(PG_FUNCTION_ARGS) +{ + text *nodeNameText = PG_GETARG_TEXT_P(0); + char *nodeNameString = text_to_cstring(nodeNameText); + int nodePort = PG_GETARG_INT32(1); + text *resultIdText = PG_GETARG_TEXT_P(2); + char *resultIdString = text_to_cstring(resultIdText); + text *queryText = PG_GETARG_TEXT_P(3); + char *queryString = text_to_cstring(queryText); + bool writeLocalFile = false; + ParamListInfo paramListInfo = NULL; + + CheckCitusVersion(ERROR); + + WorkerNode *workerNode = FindWorkerNode(nodeNameString, nodePort); + + /* + * Make sure that this transaction has a distributed transaction ID. + * + * Intermediate results will be stored in a directory that is derived + * from the distributed transaction ID. + */ + UseCoordinatedTransaction(); + + EState *estate = CreateExecutorState(); + DestReceiver *resultDest = CreateRemoteFileDestReceiver(resultIdString, estate, + list_make1(workerNode), + writeLocalFile); + + ExecuteQueryStringIntoDestReceiver(queryString, paramListInfo, + (DestReceiver *) resultDest); + + FreeExecutorState(estate); + + PG_RETURN_VOID(); +} diff --git a/src/include/distributed/intermediate_results.h b/src/include/distributed/intermediate_results.h index e020a8635..136a8c541 100644 --- a/src/include/distributed/intermediate_results.h +++ b/src/include/distributed/intermediate_results.h @@ -25,6 +25,7 @@ extern DestReceiver * CreateRemoteFileDestReceiver(char *resultId, EState *executorState, List *initialNodeList, bool writeLocalFile); +extern void SendQueryResultViaCopy(const char *resultId); extern void ReceiveQueryResultViaCopy(const char *resultId); extern void RemoveIntermediateResultsDirectory(void); extern int64 IntermediateResultSize(char *resultId); diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index a4c292713..a4a4091f5 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -73,3 +73,6 @@ s/_id_ref_id_fkey/_id_fkey/g s/_ref_id_id_fkey_/_ref_id_fkey_/g s/fk_test_2_col1_col2_fkey/fk_test_2_col1_fkey/g s/_id_other_column_ref_fkey/_id_fkey/g + +# intermediate_results +s/(ERROR.*)pgsql_job_cache\/([0-9]+_[0-9]+_[0-9]+)\/(.*).data/\1pgsql_job_cache\/xx_x_xxx\/\3.data/g diff --git a/src/test/regress/expected/intermediate_results.out b/src/test/regress/expected/intermediate_results.out index fd2995697..b97cac425 100644 --- a/src/test/regress/expected/intermediate_results.out +++ b/src/test/regress/expected/intermediate_results.out @@ -1,6 +1,11 @@ -- Test functions for copying intermediate results CREATE SCHEMA intermediate_results; SET search_path TO 'intermediate_results'; +-- helper udfs +CREATE OR REPLACE FUNCTION pg_catalog.store_intermediate_result_on_node(nodename text, nodeport int, result_id text, query text) + RETURNS void + LANGUAGE C STRICT VOLATILE + AS 'citus', $$store_intermediate_result_on_node$$; -- in the same transaction we can read a result BEGIN; SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s'); @@ -413,6 +418,127 @@ EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_results(ARRAY['stored_squares (1 row) END; +-- +-- fetch_intermediate_results +-- +-- straightforward, single-result case +BEGIN; +SELECT broadcast_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1, 5) s'); + broadcast_intermediate_result +------------------------------- + 5 +(1 row) + +SELECT * FROM fetch_intermediate_results(ARRAY['squares_1']::text[], 'localhost', :worker_2_port); + fetch_intermediate_results +---------------------------- + 111 +(1 row) + +SELECT * FROM read_intermediate_result('squares_1', 'binary') AS res (x int, x2 int); + x | x2 +---+---- + 1 | 1 + 2 | 4 + 3 | 9 + 4 | 16 + 5 | 25 +(5 rows) + +SELECT * FROM fetch_intermediate_results(ARRAY['squares_1']::text[], 'localhost', :worker_1_port); + fetch_intermediate_results +---------------------------- + 111 +(1 row) + +SELECT * FROM read_intermediate_result('squares_1', 'binary') AS res (x int, x2 int); + x | x2 +---+---- + 1 | 1 + 2 | 4 + 3 | 9 + 4 | 16 + 5 | 25 +(5 rows) + +END; +-- multiple results, and some error cases +BEGIN; +SELECT store_intermediate_result_on_node('localhost', :worker_1_port, + 'squares_1', 'SELECT s, s*s FROM generate_series(1, 2) s'); + store_intermediate_result_on_node +----------------------------------- + +(1 row) + +SELECT store_intermediate_result_on_node('localhost', :worker_1_port, + 'squares_2', 'SELECT s, s*s FROM generate_series(3, 4) s'); + store_intermediate_result_on_node +----------------------------------- + +(1 row) + +SAVEPOINT s1; +-- results aren't available on coordinator yet +SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); +ERROR: result "squares_1" does not exist +ROLLBACK TO SAVEPOINT s1; +-- fetch from worker 2 should fail +SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', :worker_2_port); +ERROR: could not open file "base/pgsql_job_cache/10_0_200/squares_1.data": No such file or directory +CONTEXT: while executing command on localhost:57638 +ROLLBACK TO SAVEPOINT s1; +-- still, results aren't available on coordinator yet +SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); +ERROR: result "squares_1" does not exist +ROLLBACK TO SAVEPOINT s1; +-- fetch from worker 1 should succeed +SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', :worker_1_port); + fetch_intermediate_results +---------------------------- + 114 +(1 row) + +SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); + x | x2 +---+---- + 1 | 1 + 2 | 4 + 3 | 9 + 4 | 16 +(4 rows) + +-- fetching again should succeed +SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', :worker_1_port); + fetch_intermediate_results +---------------------------- + 114 +(1 row) + +SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); + x | x2 +---+---- + 1 | 1 + 2 | 4 + 3 | 9 + 4 | 16 +(4 rows) + +ROLLBACK TO SAVEPOINT s1; +-- empty result id list should succeed +SELECT * FROM fetch_intermediate_results(ARRAY[]::text[], 'localhost', :worker_1_port); + fetch_intermediate_results +---------------------------- + 0 +(1 row) + +-- null in result id list should error gracefully +SELECT * FROM fetch_intermediate_results(ARRAY[NULL, 'squares_1', 'squares_2']::text[], 'localhost', :worker_1_port); +ERROR: worker array object cannot contain null values +END; +-- results should have been deleted after transaction commit +SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); +ERROR: result "squares_1" does not exist DROP SCHEMA intermediate_results CASCADE; NOTICE: drop cascades to 5 other objects DETAIL: drop cascades to table interesting_squares diff --git a/src/test/regress/sql/intermediate_results.sql b/src/test/regress/sql/intermediate_results.sql index cdb5797b9..c421c3da9 100644 --- a/src/test/regress/sql/intermediate_results.sql +++ b/src/test/regress/sql/intermediate_results.sql @@ -2,6 +2,12 @@ CREATE SCHEMA intermediate_results; SET search_path TO 'intermediate_results'; +-- helper udfs +CREATE OR REPLACE FUNCTION pg_catalog.store_intermediate_result_on_node(nodename text, nodeport int, result_id text, query text) + RETURNS void + LANGUAGE C STRICT VOLATILE + AS 'citus', $$store_intermediate_result_on_node$$; + -- in the same transaction we can read a result BEGIN; SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s'); @@ -218,4 +224,49 @@ SELECT create_intermediate_result('stored_squares', 'SELECT square FROM stored_s EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_results(ARRAY['stored_squares'], 'text') AS res (s intermediate_results.square_type); END; +-- +-- fetch_intermediate_results +-- + +-- straightforward, single-result case +BEGIN; +SELECT broadcast_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1, 5) s'); +SELECT * FROM fetch_intermediate_results(ARRAY['squares_1']::text[], 'localhost', :worker_2_port); +SELECT * FROM read_intermediate_result('squares_1', 'binary') AS res (x int, x2 int); +SELECT * FROM fetch_intermediate_results(ARRAY['squares_1']::text[], 'localhost', :worker_1_port); +SELECT * FROM read_intermediate_result('squares_1', 'binary') AS res (x int, x2 int); +END; + +-- multiple results, and some error cases +BEGIN; +SELECT store_intermediate_result_on_node('localhost', :worker_1_port, + 'squares_1', 'SELECT s, s*s FROM generate_series(1, 2) s'); +SELECT store_intermediate_result_on_node('localhost', :worker_1_port, + 'squares_2', 'SELECT s, s*s FROM generate_series(3, 4) s'); +SAVEPOINT s1; +-- results aren't available on coordinator yet +SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); +ROLLBACK TO SAVEPOINT s1; +-- fetch from worker 2 should fail +SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', :worker_2_port); +ROLLBACK TO SAVEPOINT s1; +-- still, results aren't available on coordinator yet +SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); +ROLLBACK TO SAVEPOINT s1; +-- fetch from worker 1 should succeed +SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', :worker_1_port); +SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); +-- fetching again should succeed +SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', :worker_1_port); +SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); +ROLLBACK TO SAVEPOINT s1; +-- empty result id list should succeed +SELECT * FROM fetch_intermediate_results(ARRAY[]::text[], 'localhost', :worker_1_port); +-- null in result id list should error gracefully +SELECT * FROM fetch_intermediate_results(ARRAY[NULL, 'squares_1', 'squares_2']::text[], 'localhost', :worker_1_port); +END; + +-- results should have been deleted after transaction commit +SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); + DROP SCHEMA intermediate_results CASCADE;