Implement read_intermediate_results

pull/3304/head
Hadi Moshayedi 2019-12-13 15:17:27 -08:00
parent 7ff4ce2169
commit 113bd1e5f1
6 changed files with 248 additions and 18 deletions

View File

@ -92,10 +92,14 @@ static void RemoteFileDestReceiverDestroy(DestReceiver *destReceiver);
static char * CreateIntermediateResultsDirectory(void); static char * CreateIntermediateResultsDirectory(void);
static char * IntermediateResultsDirectory(void); static char * IntermediateResultsDirectory(void);
static char * QueryResultFileName(const char *resultId); static char * QueryResultFileName(const char *resultId);
static void ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo,
char *copyFormat,
Datum *resultIdArray,
int resultCount);
/* exports for SQL callable functions */ /* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(read_intermediate_result); PG_FUNCTION_INFO_V1(read_intermediate_result);
PG_FUNCTION_INFO_V1(read_intermediate_result_array);
PG_FUNCTION_INFO_V1(broadcast_intermediate_result); PG_FUNCTION_INFO_V1(broadcast_intermediate_result);
PG_FUNCTION_INFO_V1(create_intermediate_result); PG_FUNCTION_INFO_V1(create_intermediate_result);
@ -693,31 +697,75 @@ IntermediateResultSize(char *resultId)
Datum Datum
read_intermediate_result(PG_FUNCTION_ARGS) read_intermediate_result(PG_FUNCTION_ARGS)
{ {
text *resultIdText = PG_GETARG_TEXT_P(0); Datum resultId = PG_GETARG_DATUM(0);
char *resultIdString = text_to_cstring(resultIdText);
Datum copyFormatOidDatum = PG_GETARG_DATUM(1); Datum copyFormatOidDatum = PG_GETARG_DATUM(1);
Datum copyFormatLabelDatum = DirectFunctionCall1(enum_out, copyFormatOidDatum); Datum copyFormatLabelDatum = DirectFunctionCall1(enum_out, copyFormatOidDatum);
char *copyFormatLabel = DatumGetCString(copyFormatLabelDatum); char *copyFormatLabel = DatumGetCString(copyFormatLabelDatum);
struct stat fileStat; CheckCitusVersion(ERROR);
TupleDesc tupleDescriptor = NULL; ReadIntermediateResultsIntoFuncOutput(fcinfo, copyFormatLabel, &resultId, 1);
PG_RETURN_DATUM(0);
}
/*
* read_intermediate_result_array returns the set of records in a set of given
* COPY-formatted intermediate result files.
*
* The usage and semantics of this is same as read_intermediate_result(), except
* that its first argument is an array of result ids.
*/
Datum
read_intermediate_result_array(PG_FUNCTION_ARGS)
{
ArrayType *resultIdObject = PG_GETARG_ARRAYTYPE_P(0);
Datum copyFormatOidDatum = PG_GETARG_DATUM(1);
Datum copyFormatLabelDatum = DirectFunctionCall1(enum_out, copyFormatOidDatum);
char *copyFormatLabel = DatumGetCString(copyFormatLabelDatum);
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
char *resultFileName = QueryResultFileName(resultIdString); int32 resultCount = ArrayGetNItems(ARR_NDIM(resultIdObject), ARR_DIMS(
resultIdObject));
Datum *resultIdArray = DeconstructArrayObject(resultIdObject);
ReadIntermediateResultsIntoFuncOutput(fcinfo, copyFormatLabel,
resultIdArray, resultCount);
PG_RETURN_DATUM(0);
}
/*
* ReadIntermediateResultsIntoFuncOutput reads the given result files and stores
* them at the function's output tuple store. Errors out if any of the result files
* don't exist.
*/
static void
ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, char *copyFormat,
Datum *resultIdArray, int resultCount)
{
TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
for (int resultIndex = 0; resultIndex < resultCount; resultIndex++)
{
char *resultId = TextDatumGetCString(resultIdArray[resultIndex]);
char *resultFileName = QueryResultFileName(resultId);
struct stat fileStat;
int statOK = stat(resultFileName, &fileStat); int statOK = stat(resultFileName, &fileStat);
if (statOK != 0) if (statOK != 0)
{ {
ereport(ERROR, (errcode_for_file_access(), ereport(ERROR, (errcode_for_file_access(),
errmsg("result \"%s\" does not exist", resultIdString))); errmsg("result \"%s\" does not exist", resultId)));
} }
Tuplestorestate *tupstore = SetupTuplestore(fcinfo, &tupleDescriptor); ReadFileIntoTupleStore(resultFileName, copyFormat, tupleDescriptor, tupleStore);
}
ReadFileIntoTupleStore(resultFileName, copyFormatLabel, tupleDescriptor, tupstore); tuplestore_donestoring(tupleStore);
tuplestore_donestoring(tupstore);
return (Datum) 0;
} }

View File

@ -1,3 +1,5 @@
#include "udfs/read_intermediate_results/9.2-1.sql"
ALTER TABLE pg_catalog.pg_dist_colocation ADD distributioncolumncollation oid; ALTER TABLE pg_catalog.pg_dist_colocation ADD distributioncolumncollation oid;
UPDATE pg_catalog.pg_dist_colocation dc SET distributioncolumncollation = t.typcollation UPDATE pg_catalog.pg_dist_colocation dc SET distributioncolumncollation = t.typcollation
FROM pg_catalog.pg_type t WHERE t.oid = dc.distributioncolumntype; FROM pg_catalog.pg_type t WHERE t.oid = dc.distributioncolumntype;

View File

@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.read_intermediate_results(
result_ids text[],
format pg_catalog.citus_copy_format default 'csv')
RETURNS SETOF record
LANGUAGE C STRICT VOLATILE PARALLEL SAFE
AS 'MODULE_PATHNAME', $$read_intermediate_result_array$$;
COMMENT ON FUNCTION pg_catalog.read_intermediate_results(text[],pg_catalog.citus_copy_format)
IS 'read a set files and return them as a set of records';

View File

@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.read_intermediate_results(
result_ids text[],
format pg_catalog.citus_copy_format default 'csv')
RETURNS SETOF record
LANGUAGE C STRICT VOLATILE PARALLEL SAFE
AS 'MODULE_PATHNAME', $$read_intermediate_result_array$$;
COMMENT ON FUNCTION pg_catalog.read_intermediate_results(text[],pg_catalog.citus_copy_format)
IS 'read a set files and return them as a set of records';

View File

@ -122,7 +122,7 @@ SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series
(1 row) (1 row)
SELECT * FROM read_intermediate_result('squares', 'csv') AS res (x int, x2 int); SELECT * FROM read_intermediate_result('squares', 'csv') AS res (x int, x2 int);
ERROR: invalid input syntax for integer: "PGCOPY" ERROR: invalid input syntax for type integer: "PGCOPY"
END; END;
-- try a composite type -- try a composite type
CREATE TYPE intermediate_results.square_type AS (x text, x2 int); CREATE TYPE intermediate_results.square_type AS (x text, x2 int);
@ -259,6 +259,116 @@ select broadcast_intermediate_result('a', 'prepare foo as select 1');
ERROR: cannot execute utility commands ERROR: cannot execute utility commands
select create_intermediate_result('a', 'create table foo(int serial)'); select create_intermediate_result('a', 'create table foo(int serial)');
ERROR: cannot execute utility commands ERROR: cannot execute utility commands
--
-- read_intermediate_results
--
BEGIN;
SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,3) s'),
create_intermediate_result('squares_2', 'SELECT s, s*s FROM generate_series(4,6) s'),
create_intermediate_result('squares_3', 'SELECT s, s*s FROM generate_series(7,10) s');
create_intermediate_result | create_intermediate_result | create_intermediate_result
----------------------------+----------------------------+----------------------------
3 | 3 | 4
(1 row)
SELECT count(*) FROM read_intermediate_results(ARRAY[]::text[], 'binary') AS res (x int, x2 int);
count
-------
0
(1 row)
SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int);
x | x2
---+----
1 | 1
2 | 4
3 | 9
(3 rows)
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2', 'squares_3']::text[], 'binary') AS res (x int, x2 int);
x | x2
----+-----
1 | 1
2 | 4
3 | 9
4 | 16
5 | 25
6 | 36
7 | 49
8 | 64
9 | 81
10 | 100
(10 rows)
COMMIT;
-- in separate transactions, the result is no longer available
SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,5) s');
create_intermediate_result
----------------------------
5
(1 row)
SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int);
ERROR: result "squares_1" does not exist
-- error behaviour, and also check that results are deleted on rollback
BEGIN;
SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,3) s');
create_intermediate_result
----------------------------
3
(1 row)
SAVEPOINT s1;
SELECT * FROM read_intermediate_results(ARRAY['notexistingfile', 'squares_1'], 'binary') AS res (x int, x2 int);
ERROR: result "notexistingfile" does not exist
ROLLBACK TO SAVEPOINT s1;
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'notexistingfile'], 'binary') AS res (x int, x2 int);
ERROR: result "notexistingfile" does not exist
ROLLBACK TO SAVEPOINT s1;
SELECT * FROM read_intermediate_results(ARRAY['squares_1', NULL], 'binary') AS res (x int, x2 int);
ERROR: null array element not allowed in this context
ROLLBACK TO SAVEPOINT s1;
-- after rollbacks we should be able to run vail read_intermediate_results still.
SELECT count(*) FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int);
count
-------
3
(1 row)
SELECT count(*) FROM read_intermediate_results(ARRAY[]::text[], 'binary') AS res (x int, x2 int);
count
-------
0
(1 row)
END;
SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int);
ERROR: result "squares_1" does not exist
-- Test non-binary format: read_intermediate_results(..., 'text')
BEGIN;
-- ROW(...) types switch the output format to text
SELECT broadcast_intermediate_result('stored_squares_1',
'SELECT s, s*s, ROW(1::text, 2) FROM generate_series(1,3) s'),
broadcast_intermediate_result('stored_squares_2',
'SELECT s, s*s, ROW(2::text, 3) FROM generate_series(4,6) s');
broadcast_intermediate_result | broadcast_intermediate_result
-------------------------------+-------------------------------
3 | 3
(1 row)
-- query the intermediate result in a router query using text format
SELECT * FROM interesting_squares JOIN (
SELECT * FROM
read_intermediate_results(ARRAY['stored_squares_1', 'stored_squares_2'], 'binary') AS res (x int, x2 int, z intermediate_results.square_type)
) squares
ON (squares.x::text = interested_in) WHERE user_id = 'jon' ORDER BY 1,2;
user_id | interested_in | x | x2 | z
---------+---------------+---+----+-------
jon | 2 | 2 | 4 | (1,2)
jon | 5 | 5 | 25 | (2,3)
(2 rows)
END;
DROP SCHEMA intermediate_results CASCADE; DROP SCHEMA intermediate_results CASCADE;
NOTICE: drop cascades to 5 other objects NOTICE: drop cascades to 5 other objects
DETAIL: drop cascades to table interesting_squares DETAIL: drop cascades to table interesting_squares

View File

@ -148,4 +148,58 @@ select broadcast_intermediate_result('a', 'create table foo(int serial)');
select broadcast_intermediate_result('a', 'prepare foo as select 1'); select broadcast_intermediate_result('a', 'prepare foo as select 1');
select create_intermediate_result('a', 'create table foo(int serial)'); select create_intermediate_result('a', 'create table foo(int serial)');
--
-- read_intermediate_results
--
BEGIN;
SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,3) s'),
create_intermediate_result('squares_2', 'SELECT s, s*s FROM generate_series(4,6) s'),
create_intermediate_result('squares_3', 'SELECT s, s*s FROM generate_series(7,10) s');
SELECT count(*) FROM read_intermediate_results(ARRAY[]::text[], 'binary') AS res (x int, x2 int);
SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int);
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2', 'squares_3']::text[], 'binary') AS res (x int, x2 int);
COMMIT;
-- in separate transactions, the result is no longer available
SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,5) s');
SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int);
-- error behaviour, and also check that results are deleted on rollback
BEGIN;
SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,3) s');
SAVEPOINT s1;
SELECT * FROM read_intermediate_results(ARRAY['notexistingfile', 'squares_1'], 'binary') AS res (x int, x2 int);
ROLLBACK TO SAVEPOINT s1;
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'notexistingfile'], 'binary') AS res (x int, x2 int);
ROLLBACK TO SAVEPOINT s1;
SELECT * FROM read_intermediate_results(ARRAY['squares_1', NULL], 'binary') AS res (x int, x2 int);
ROLLBACK TO SAVEPOINT s1;
-- after rollbacks we should be able to run vail read_intermediate_results still.
SELECT count(*) FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int);
SELECT count(*) FROM read_intermediate_results(ARRAY[]::text[], 'binary') AS res (x int, x2 int);
END;
SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int);
-- Test non-binary format: read_intermediate_results(..., 'text')
BEGIN;
-- ROW(...) types switch the output format to text
SELECT broadcast_intermediate_result('stored_squares_1',
'SELECT s, s*s, ROW(1::text, 2) FROM generate_series(1,3) s'),
broadcast_intermediate_result('stored_squares_2',
'SELECT s, s*s, ROW(2::text, 3) FROM generate_series(4,6) s');
-- query the intermediate result in a router query using text format
SELECT * FROM interesting_squares JOIN (
SELECT * FROM
read_intermediate_results(ARRAY['stored_squares_1', 'stored_squares_2'], 'binary') AS res (x int, x2 int, z intermediate_results.square_type)
) squares
ON (squares.x::text = interested_in) WHERE user_id = 'jon' ORDER BY 1,2;
END;
DROP SCHEMA intermediate_results CASCADE; DROP SCHEMA intermediate_results CASCADE;