Merge pull request #402 from citusdata/bugfix/#313-UDF-on-repartitioned-subqueries

Make UDFs Work with Repartitioned Subqueries
pull/419/head
Murat Tuncer 2016-03-30 15:45:20 +03:00
commit 14c835b37d
4 changed files with 94 additions and 0 deletions

View File

@ -160,6 +160,9 @@ worker_merge_files_and_run_query(PG_FUNCTION_ARGS)
appendStringInfo(setSearchPathString, SET_SEARCH_PATH_COMMAND, jobSchemaName->data);
/* Add "public" to search path to access UDFs in public schema */
appendStringInfo(setSearchPathString, ",public");
connected = SPI_connect();
if (connected != SPI_OK_CONNECT)
{

View File

@ -0,0 +1,44 @@
--
-- MULTI_REPARTITIONED_SUBQUERY_UDF
--
-- Create UDF in master and workers
\c - - - :master_port
DROP FUNCTION IF EXISTS median(double precision[]);
NOTICE: function median(pg_catalog.float8[]) does not exist, skipping
CREATE FUNCTION median(double precision[]) RETURNS double precision
LANGUAGE sql IMMUTABLE AS $_$
SELECT AVG(val) FROM
(SELECT val FROM unnest($1) val
ORDER BY 1 LIMIT 2 - MOD(array_upper($1, 1), 2)
OFFSET CEIL(array_upper($1, 1) / 2.0) - 1) sub;
$_$;
\c - - - :worker_1_port
DROP FUNCTION IF EXISTS median(double precision[]);
NOTICE: function median(pg_catalog.float8[]) does not exist, skipping
CREATE FUNCTION median(double precision[]) RETURNS double precision
LANGUAGE sql IMMUTABLE AS $_$
SELECT AVG(val) FROM
(SELECT val FROM unnest($1) val
ORDER BY 1 LIMIT 2 - MOD(array_upper($1, 1), 2)
OFFSET CEIL(array_upper($1, 1) / 2.0) - 1) sub;
$_$;
\c - - - :worker_2_port
DROP FUNCTION IF EXISTS median(double precision[]);
NOTICE: function median(pg_catalog.float8[]) does not exist, skipping
CREATE FUNCTION median(double precision[]) RETURNS double precision
LANGUAGE sql IMMUTABLE AS $_$
SELECT AVG(val) FROM
(SELECT val FROM unnest($1) val
ORDER BY 1 LIMIT 2 - MOD(array_upper($1, 1), 2)
OFFSET CEIL(array_upper($1, 1) / 2.0) - 1) sub;
$_$;
-- Run query on master
\c - - - :master_port
SET citus.task_executor_type TO 'task-tracker';
SELECT * FROM (SELECT median(ARRAY[1,2,sum(l_suppkey)]) as median, count(*)
FROM lineitem GROUP BY l_partkey) AS a
WHERE median > 2;
median | count
--------+-------
(0 rows)

View File

@ -119,6 +119,7 @@ test: multi_simple_queries
test: multi_utilities
test: multi_create_insert_proxy
test: multi_data_types
test: multi_repartitioned_subquery_udf
# ----------
# multi_large_shardid stages more shards into lineitem

View File

@ -0,0 +1,46 @@
--
-- MULTI_REPARTITIONED_SUBQUERY_UDF
--
-- Create UDF in master and workers
\c - - - :master_port
DROP FUNCTION IF EXISTS median(double precision[]);
CREATE FUNCTION median(double precision[]) RETURNS double precision
LANGUAGE sql IMMUTABLE AS $_$
SELECT AVG(val) FROM
(SELECT val FROM unnest($1) val
ORDER BY 1 LIMIT 2 - MOD(array_upper($1, 1), 2)
OFFSET CEIL(array_upper($1, 1) / 2.0) - 1) sub;
$_$;
\c - - - :worker_1_port
DROP FUNCTION IF EXISTS median(double precision[]);
CREATE FUNCTION median(double precision[]) RETURNS double precision
LANGUAGE sql IMMUTABLE AS $_$
SELECT AVG(val) FROM
(SELECT val FROM unnest($1) val
ORDER BY 1 LIMIT 2 - MOD(array_upper($1, 1), 2)
OFFSET CEIL(array_upper($1, 1) / 2.0) - 1) sub;
$_$;
\c - - - :worker_2_port
DROP FUNCTION IF EXISTS median(double precision[]);
CREATE FUNCTION median(double precision[]) RETURNS double precision
LANGUAGE sql IMMUTABLE AS $_$
SELECT AVG(val) FROM
(SELECT val FROM unnest($1) val
ORDER BY 1 LIMIT 2 - MOD(array_upper($1, 1), 2)
OFFSET CEIL(array_upper($1, 1) / 2.0) - 1) sub;
$_$;
-- Run query on master
\c - - - :master_port
SET citus.task_executor_type TO 'task-tracker';
SELECT * FROM (SELECT median(ARRAY[1,2,sum(l_suppkey)]) as median, count(*)
FROM lineitem GROUP BY l_partkey) AS a
WHERE median > 2;