Merge pull request #3318 from citusdata/fetch_intermediate_results

Implement fetch_intermediate_results
pull/3325/head
Hadi Moshayedi 2019-12-18 10:51:56 -08:00 committed by GitHub
commit c9ceff7d78
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 521 additions and 1 deletions

View File

@ -2735,7 +2735,14 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryS
{
const char *resultId = copyStatement->relation->relname;
ReceiveQueryResultViaCopy(resultId);
if (copyStatement->is_from)
{
ReceiveQueryResultViaCopy(resultId);
}
else
{
SendQueryResultViaCopy(resultId);
}
return NULL;
}

View File

@ -23,6 +23,7 @@
#include "distributed/intermediate_results.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_executor.h"
#include "distributed/remote_commands.h"
#include "distributed/transmit.h"
@ -96,12 +97,17 @@ static void ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo,
char *copyFormat,
Datum *resultIdArray,
int resultCount);
static uint64 FetchRemoteIntermediateResult(MultiConnection *connection, char *resultId);
static CopyStatus CopyDataFromConnection(MultiConnection *connection,
FileCompat *fileCompat,
uint64 *bytesReceived);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(read_intermediate_result);
PG_FUNCTION_INFO_V1(read_intermediate_result_array);
PG_FUNCTION_INFO_V1(broadcast_intermediate_result);
PG_FUNCTION_INFO_V1(create_intermediate_result);
PG_FUNCTION_INFO_V1(fetch_intermediate_results);
/*
@ -511,6 +517,20 @@ RemoteFileDestReceiverDestroy(DestReceiver *destReceiver)
}
/*
* SendQueryResultViaCopy is called when a COPY "resultid" TO STDOUT
* WITH (format result) command is received from the client. The
* contents of the file are sent directly to the client.
*/
void
SendQueryResultViaCopy(const char *resultId)
{
const char *resultFileName = QueryResultFileName(resultId);
SendRegularFile(resultFileName);
}
/*
* ReceiveQueryResultViaCopy is called when a COPY "resultid" FROM
* STDIN WITH (format result) command is received from the client.
@ -769,3 +789,226 @@ ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, char *copyFormat,
tuplestore_donestoring(tupleStore);
}
/*
* fetch_intermediate_results fetches a set of intermediate results defined in an
* array of result IDs from a remote node and writes them to a local intermediate
* result with the same ID.
*/
Datum
fetch_intermediate_results(PG_FUNCTION_ARGS)
{
ArrayType *resultIdObject = PG_GETARG_ARRAYTYPE_P(0);
Datum *resultIdArray = DeconstructArrayObject(resultIdObject);
int32 resultCount = ArrayObjectCount(resultIdObject);
text *remoteHostText = PG_GETARG_TEXT_P(1);
char *remoteHost = text_to_cstring(remoteHostText);
int remotePort = PG_GETARG_INT32(2);
int connectionFlags = 0;
int resultIndex = 0;
int64 totalBytesWritten = 0L;
CheckCitusVersion(ERROR);
if (resultCount == 0)
{
PG_RETURN_INT64(0);
}
if (!IsMultiStatementTransaction())
{
ereport(ERROR, (errmsg("fetch_intermediate_results can only be used in a "
"distributed transaction")));
}
/*
* Make sure that this transaction has a distributed transaction ID.
*
* Intermediate results will be stored in a directory that is derived
* from the distributed transaction ID.
*/
UseCoordinatedTransaction();
MultiConnection *connection = GetNodeConnection(connectionFlags, remoteHost,
remotePort);
if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
ereport(ERROR, (errmsg("cannot connect to %s:%d to fetch intermediate "
"results",
remoteHost, remotePort)));
}
RemoteTransactionBegin(connection);
for (resultIndex = 0; resultIndex < resultCount; resultIndex++)
{
char *resultId = TextDatumGetCString(resultIdArray[resultIndex]);
totalBytesWritten += FetchRemoteIntermediateResult(connection, resultId);
}
RemoteTransactionCommit(connection);
CloseConnection(connection);
PG_RETURN_INT64(totalBytesWritten);
}
/*
* FetchRemoteIntermediateResult fetches a remote intermediate result over
* the given connection.
*/
static uint64
FetchRemoteIntermediateResult(MultiConnection *connection, char *resultId)
{
uint64 totalBytesWritten = 0;
StringInfo copyCommand = makeStringInfo();
const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY);
const int fileMode = (S_IRUSR | S_IWUSR);
PGconn *pgConn = connection->pgConn;
int socket = PQsocket(pgConn);
bool raiseErrors = true;
CreateIntermediateResultsDirectory();
appendStringInfo(copyCommand, "COPY \"%s\" TO STDOUT WITH (format result)",
resultId);
if (!SendRemoteCommand(connection, copyCommand->data))
{
ReportConnectionError(connection, ERROR);
}
PGresult *result = GetRemoteCommandResult(connection, raiseErrors);
if (PQresultStatus(result) != PGRES_COPY_OUT)
{
ReportResultError(connection, result, ERROR);
}
PQclear(result);
char *localPath = QueryResultFileName(resultId);
File fileDesc = FileOpenForTransmit(localPath, fileFlags, fileMode);
FileCompat fileCompat = FileCompatFromFileStart(fileDesc);
while (true)
{
int waitFlags = WL_SOCKET_READABLE;
CopyStatus copyStatus = CopyDataFromConnection(connection, &fileCompat,
&totalBytesWritten);
if (copyStatus == CLIENT_COPY_FAILED)
{
ereport(ERROR, (errmsg("failed to read result \"%s\" from node %s:%d",
resultId, connection->hostname, connection->port)));
}
else if (copyStatus == CLIENT_COPY_DONE)
{
break;
}
Assert(copyStatus == CLIENT_COPY_MORE);
int rc = WaitLatchOrSocket(MyLatch, waitFlags, socket, 0, PG_WAIT_EXTENSION);
if (rc & WL_POSTMASTER_DEATH)
{
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
}
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
}
}
FileClose(fileDesc);
ClearResults(connection, raiseErrors);
return totalBytesWritten;
}
/*
* CopyDataFromConnection reads a row of copy data from connection and writes it
* to the given file.
*/
static CopyStatus
CopyDataFromConnection(MultiConnection *connection, FileCompat *fileCompat,
uint64 *bytesReceived)
{
/*
* Consume input to handle the case where previous copy operation might have
* received zero bytes.
*/
int consumed = PQconsumeInput(connection->pgConn);
if (consumed == 0)
{
return CLIENT_COPY_FAILED;
}
/* receive copy data message in an asynchronous manner */
char *receiveBuffer = NULL;
bool asynchronous = true;
int receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, asynchronous);
while (receiveLength > 0)
{
/* received copy data; append these data to file */
errno = 0;
int bytesWritten = FileWriteCompat(fileCompat, receiveBuffer,
receiveLength, PG_WAIT_IO);
if (bytesWritten != receiveLength)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not append to file: %m")));
}
*bytesReceived += receiveLength;
PQfreemem(receiveBuffer);
receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, asynchronous);
}
if (receiveLength == 0)
{
/* we cannot read more data without blocking */
return CLIENT_COPY_MORE;
}
else if (receiveLength == -1)
{
/* received copy done message */
bool raiseInterrupts = true;
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
ExecStatusType resultStatus = PQresultStatus(result);
CopyStatus copyStatus = 0;
if (resultStatus == PGRES_COMMAND_OK)
{
copyStatus = CLIENT_COPY_DONE;
}
else
{
copyStatus = CLIENT_COPY_FAILED;
ReportResultError(connection, result, WARNING);
}
PQclear(result);
ForgetResults(connection);
return copyStatus;
}
else
{
Assert(receiveLength == -2);
ReportConnectionError(connection, WARNING);
return CLIENT_COPY_FAILED;
}
}

View File

@ -1,4 +1,5 @@
#include "udfs/read_intermediate_results/9.2-1.sql"
#include "udfs/fetch_intermediate_results/9.2-1.sql"
ALTER TABLE pg_catalog.pg_dist_colocation ADD distributioncolumncollation oid;
UPDATE pg_catalog.pg_dist_colocation dc SET distributioncolumncollation = t.typcollation

View File

@ -0,0 +1,9 @@
CREATE OR REPLACE FUNCTION pg_catalog.fetch_intermediate_results(
result_ids text[],
node_name text,
node_port int)
RETURNS bigint
LANGUAGE C STRICT VOLATILE
AS 'MODULE_PATHNAME', $$fetch_intermediate_results$$;
COMMENT ON FUNCTION pg_catalog.fetch_intermediate_results(text[],text,int)
IS 'fetch array of intermediate results from a remote node. returns number of bytes read.';

View File

@ -0,0 +1,9 @@
CREATE OR REPLACE FUNCTION pg_catalog.fetch_intermediate_results(
result_ids text[],
node_name text,
node_port int)
RETURNS bigint
LANGUAGE C STRICT VOLATILE
AS 'MODULE_PATHNAME', $$fetch_intermediate_results$$;
COMMENT ON FUNCTION pg_catalog.fetch_intermediate_results(text[],text,int)
IS 'fetch array of intermediate results from a remote node. returns number of bytes read.';

View File

@ -0,0 +1,70 @@
/*-------------------------------------------------------------------------
*
* test/src/intermediate_results.c
*
* This file contains functions to test functions related to
* src/backend/distributed/executor/intermediate_results.c.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include <sys/stat.h>
#include <unistd.h>
#include "postgres.h"
#include "funcapi.h"
#include "libpq-fe.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/connection_management.h"
#include "distributed/intermediate_results.h"
#include "distributed/multi_executor.h"
#include "distributed/remote_commands.h"
PG_FUNCTION_INFO_V1(store_intermediate_result_on_node);
/*
* store_intermediate_result_on_node executes a query and streams the results
* into a file on the given node.
*/
Datum
store_intermediate_result_on_node(PG_FUNCTION_ARGS)
{
text *nodeNameText = PG_GETARG_TEXT_P(0);
char *nodeNameString = text_to_cstring(nodeNameText);
int nodePort = PG_GETARG_INT32(1);
text *resultIdText = PG_GETARG_TEXT_P(2);
char *resultIdString = text_to_cstring(resultIdText);
text *queryText = PG_GETARG_TEXT_P(3);
char *queryString = text_to_cstring(queryText);
bool writeLocalFile = false;
ParamListInfo paramListInfo = NULL;
CheckCitusVersion(ERROR);
WorkerNode *workerNode = FindWorkerNode(nodeNameString, nodePort);
/*
* Make sure that this transaction has a distributed transaction ID.
*
* Intermediate results will be stored in a directory that is derived
* from the distributed transaction ID.
*/
UseCoordinatedTransaction();
EState *estate = CreateExecutorState();
DestReceiver *resultDest = CreateRemoteFileDestReceiver(resultIdString, estate,
list_make1(workerNode),
writeLocalFile);
ExecuteQueryStringIntoDestReceiver(queryString, paramListInfo,
(DestReceiver *) resultDest);
FreeExecutorState(estate);
PG_RETURN_VOID();
}

View File

@ -25,6 +25,7 @@
extern DestReceiver * CreateRemoteFileDestReceiver(char *resultId, EState *executorState,
List *initialNodeList, bool
writeLocalFile);
extern void SendQueryResultViaCopy(const char *resultId);
extern void ReceiveQueryResultViaCopy(const char *resultId);
extern void RemoveIntermediateResultsDirectory(void);
extern int64 IntermediateResultSize(char *resultId);

View File

@ -73,3 +73,6 @@ s/_id_ref_id_fkey/_id_fkey/g
s/_ref_id_id_fkey_/_ref_id_fkey_/g
s/fk_test_2_col1_col2_fkey/fk_test_2_col1_fkey/g
s/_id_other_column_ref_fkey/_id_fkey/g
# intermediate_results
s/(ERROR.*)pgsql_job_cache\/([0-9]+_[0-9]+_[0-9]+)\/(.*).data/\1pgsql_job_cache\/xx_x_xxx\/\3.data/g

View File

@ -1,6 +1,11 @@
-- Test functions for copying intermediate results
CREATE SCHEMA intermediate_results;
SET search_path TO 'intermediate_results';
-- helper udfs
CREATE OR REPLACE FUNCTION pg_catalog.store_intermediate_result_on_node(nodename text, nodeport int, result_id text, query text)
RETURNS void
LANGUAGE C STRICT VOLATILE
AS 'citus', $$store_intermediate_result_on_node$$;
-- in the same transaction we can read a result
BEGIN;
SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
@ -413,6 +418,127 @@ EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_results(ARRAY['stored_squares
(1 row)
END;
--
-- fetch_intermediate_results
--
-- straightforward, single-result case
BEGIN;
SELECT broadcast_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1, 5) s');
broadcast_intermediate_result
-------------------------------
5
(1 row)
SELECT * FROM fetch_intermediate_results(ARRAY['squares_1']::text[], 'localhost', :worker_2_port);
fetch_intermediate_results
----------------------------
111
(1 row)
SELECT * FROM read_intermediate_result('squares_1', 'binary') AS res (x int, x2 int);
x | x2
---+----
1 | 1
2 | 4
3 | 9
4 | 16
5 | 25
(5 rows)
SELECT * FROM fetch_intermediate_results(ARRAY['squares_1']::text[], 'localhost', :worker_1_port);
fetch_intermediate_results
----------------------------
111
(1 row)
SELECT * FROM read_intermediate_result('squares_1', 'binary') AS res (x int, x2 int);
x | x2
---+----
1 | 1
2 | 4
3 | 9
4 | 16
5 | 25
(5 rows)
END;
-- multiple results, and some error cases
BEGIN;
SELECT store_intermediate_result_on_node('localhost', :worker_1_port,
'squares_1', 'SELECT s, s*s FROM generate_series(1, 2) s');
store_intermediate_result_on_node
-----------------------------------
(1 row)
SELECT store_intermediate_result_on_node('localhost', :worker_1_port,
'squares_2', 'SELECT s, s*s FROM generate_series(3, 4) s');
store_intermediate_result_on_node
-----------------------------------
(1 row)
SAVEPOINT s1;
-- results aren't available on coordinator yet
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int);
ERROR: result "squares_1" does not exist
ROLLBACK TO SAVEPOINT s1;
-- fetch from worker 2 should fail
SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', :worker_2_port);
ERROR: could not open file "base/pgsql_job_cache/10_0_200/squares_1.data": No such file or directory
CONTEXT: while executing command on localhost:57638
ROLLBACK TO SAVEPOINT s1;
-- still, results aren't available on coordinator yet
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int);
ERROR: result "squares_1" does not exist
ROLLBACK TO SAVEPOINT s1;
-- fetch from worker 1 should succeed
SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', :worker_1_port);
fetch_intermediate_results
----------------------------
114
(1 row)
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int);
x | x2
---+----
1 | 1
2 | 4
3 | 9
4 | 16
(4 rows)
-- fetching again should succeed
SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', :worker_1_port);
fetch_intermediate_results
----------------------------
114
(1 row)
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int);
x | x2
---+----
1 | 1
2 | 4
3 | 9
4 | 16
(4 rows)
ROLLBACK TO SAVEPOINT s1;
-- empty result id list should succeed
SELECT * FROM fetch_intermediate_results(ARRAY[]::text[], 'localhost', :worker_1_port);
fetch_intermediate_results
----------------------------
0
(1 row)
-- null in result id list should error gracefully
SELECT * FROM fetch_intermediate_results(ARRAY[NULL, 'squares_1', 'squares_2']::text[], 'localhost', :worker_1_port);
ERROR: worker array object cannot contain null values
END;
-- results should have been deleted after transaction commit
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int);
ERROR: result "squares_1" does not exist
DROP SCHEMA intermediate_results CASCADE;
NOTICE: drop cascades to 5 other objects
DETAIL: drop cascades to table interesting_squares

View File

@ -2,6 +2,12 @@
CREATE SCHEMA intermediate_results;
SET search_path TO 'intermediate_results';
-- helper udfs
CREATE OR REPLACE FUNCTION pg_catalog.store_intermediate_result_on_node(nodename text, nodeport int, result_id text, query text)
RETURNS void
LANGUAGE C STRICT VOLATILE
AS 'citus', $$store_intermediate_result_on_node$$;
-- in the same transaction we can read a result
BEGIN;
SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
@ -218,4 +224,49 @@ SELECT create_intermediate_result('stored_squares', 'SELECT square FROM stored_s
EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_results(ARRAY['stored_squares'], 'text') AS res (s intermediate_results.square_type);
END;
--
-- fetch_intermediate_results
--
-- straightforward, single-result case
BEGIN;
SELECT broadcast_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1, 5) s');
SELECT * FROM fetch_intermediate_results(ARRAY['squares_1']::text[], 'localhost', :worker_2_port);
SELECT * FROM read_intermediate_result('squares_1', 'binary') AS res (x int, x2 int);
SELECT * FROM fetch_intermediate_results(ARRAY['squares_1']::text[], 'localhost', :worker_1_port);
SELECT * FROM read_intermediate_result('squares_1', 'binary') AS res (x int, x2 int);
END;
-- multiple results, and some error cases
BEGIN;
SELECT store_intermediate_result_on_node('localhost', :worker_1_port,
'squares_1', 'SELECT s, s*s FROM generate_series(1, 2) s');
SELECT store_intermediate_result_on_node('localhost', :worker_1_port,
'squares_2', 'SELECT s, s*s FROM generate_series(3, 4) s');
SAVEPOINT s1;
-- results aren't available on coordinator yet
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int);
ROLLBACK TO SAVEPOINT s1;
-- fetch from worker 2 should fail
SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', :worker_2_port);
ROLLBACK TO SAVEPOINT s1;
-- still, results aren't available on coordinator yet
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int);
ROLLBACK TO SAVEPOINT s1;
-- fetch from worker 1 should succeed
SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', :worker_1_port);
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int);
-- fetching again should succeed
SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', :worker_1_port);
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int);
ROLLBACK TO SAVEPOINT s1;
-- empty result id list should succeed
SELECT * FROM fetch_intermediate_results(ARRAY[]::text[], 'localhost', :worker_1_port);
-- null in result id list should error gracefully
SELECT * FROM fetch_intermediate_results(ARRAY[NULL, 'squares_1', 'squares_2']::text[], 'localhost', :worker_1_port);
END;
-- results should have been deleted after transaction commit
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int);
DROP SCHEMA intermediate_results CASCADE;