From 8a284aab9283667a52c50e11d249b54edfa23558 Mon Sep 17 00:00:00 2001 From: eren Date: Thu, 24 Mar 2016 13:54:10 +0200 Subject: [PATCH] Fixes issue #313 Prior to this change, it was not possible to use UDFs in repartitioned subqueries. The reason is that we were setting the search path explicitly and omiting public schema from that path. This change adds the public schema to the explicitly set search path. --- .../worker/worker_merge_protocol.c | 3 ++ .../multi_repartitioned_subquery_udf.out | 44 ++++++++++++++++++ src/test/regress/multi_schedule | 1 + .../sql/multi_repartitioned_subquery_udf.sql | 46 +++++++++++++++++++ 4 files changed, 94 insertions(+) create mode 100644 src/test/regress/expected/multi_repartitioned_subquery_udf.out create mode 100644 src/test/regress/sql/multi_repartitioned_subquery_udf.sql diff --git a/src/backend/distributed/worker/worker_merge_protocol.c b/src/backend/distributed/worker/worker_merge_protocol.c index 93e4e31aa..edd4dfb8b 100644 --- a/src/backend/distributed/worker/worker_merge_protocol.c +++ b/src/backend/distributed/worker/worker_merge_protocol.c @@ -160,6 +160,9 @@ worker_merge_files_and_run_query(PG_FUNCTION_ARGS) appendStringInfo(setSearchPathString, SET_SEARCH_PATH_COMMAND, jobSchemaName->data); + /* Add "public" to search path to access UDFs in public schema */ + appendStringInfo(setSearchPathString, ",public"); + connected = SPI_connect(); if (connected != SPI_OK_CONNECT) { diff --git a/src/test/regress/expected/multi_repartitioned_subquery_udf.out b/src/test/regress/expected/multi_repartitioned_subquery_udf.out new file mode 100644 index 000000000..288ee5427 --- /dev/null +++ b/src/test/regress/expected/multi_repartitioned_subquery_udf.out @@ -0,0 +1,44 @@ +-- +-- MULTI_REPARTITIONED_SUBQUERY_UDF +-- +-- Create UDF in master and workers +\c - - - :master_port +DROP FUNCTION IF EXISTS median(double precision[]); +NOTICE: function median(pg_catalog.float8[]) does not exist, skipping +CREATE FUNCTION median(double precision[]) RETURNS double precision +LANGUAGE sql IMMUTABLE AS $_$ + SELECT AVG(val) FROM + (SELECT val FROM unnest($1) val + ORDER BY 1 LIMIT 2 - MOD(array_upper($1, 1), 2) + OFFSET CEIL(array_upper($1, 1) / 2.0) - 1) sub; +$_$; +\c - - - :worker_1_port +DROP FUNCTION IF EXISTS median(double precision[]); +NOTICE: function median(pg_catalog.float8[]) does not exist, skipping +CREATE FUNCTION median(double precision[]) RETURNS double precision +LANGUAGE sql IMMUTABLE AS $_$ + SELECT AVG(val) FROM + (SELECT val FROM unnest($1) val + ORDER BY 1 LIMIT 2 - MOD(array_upper($1, 1), 2) + OFFSET CEIL(array_upper($1, 1) / 2.0) - 1) sub; +$_$; +\c - - - :worker_2_port +DROP FUNCTION IF EXISTS median(double precision[]); +NOTICE: function median(pg_catalog.float8[]) does not exist, skipping +CREATE FUNCTION median(double precision[]) RETURNS double precision +LANGUAGE sql IMMUTABLE AS $_$ + SELECT AVG(val) FROM + (SELECT val FROM unnest($1) val + ORDER BY 1 LIMIT 2 - MOD(array_upper($1, 1), 2) + OFFSET CEIL(array_upper($1, 1) / 2.0) - 1) sub; +$_$; +-- Run query on master +\c - - - :master_port +SET citus.task_executor_type TO 'task-tracker'; +SELECT * FROM (SELECT median(ARRAY[1,2,sum(l_suppkey)]) as median, count(*) + FROM lineitem GROUP BY l_partkey) AS a + WHERE median > 2; + median | count +--------+------- +(0 rows) + diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index adb8d68f7..c690f5da2 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -119,6 +119,7 @@ test: multi_simple_queries test: multi_utilities test: multi_create_insert_proxy test: multi_data_types +test: multi_repartitioned_subquery_udf # ---------- # multi_large_shardid stages more shards into lineitem diff --git a/src/test/regress/sql/multi_repartitioned_subquery_udf.sql b/src/test/regress/sql/multi_repartitioned_subquery_udf.sql new file mode 100644 index 000000000..a245cd420 --- /dev/null +++ b/src/test/regress/sql/multi_repartitioned_subquery_udf.sql @@ -0,0 +1,46 @@ +-- +-- MULTI_REPARTITIONED_SUBQUERY_UDF +-- + +-- Create UDF in master and workers +\c - - - :master_port +DROP FUNCTION IF EXISTS median(double precision[]); + +CREATE FUNCTION median(double precision[]) RETURNS double precision +LANGUAGE sql IMMUTABLE AS $_$ + SELECT AVG(val) FROM + (SELECT val FROM unnest($1) val + ORDER BY 1 LIMIT 2 - MOD(array_upper($1, 1), 2) + OFFSET CEIL(array_upper($1, 1) / 2.0) - 1) sub; +$_$; + +\c - - - :worker_1_port +DROP FUNCTION IF EXISTS median(double precision[]); + +CREATE FUNCTION median(double precision[]) RETURNS double precision +LANGUAGE sql IMMUTABLE AS $_$ + SELECT AVG(val) FROM + (SELECT val FROM unnest($1) val + ORDER BY 1 LIMIT 2 - MOD(array_upper($1, 1), 2) + OFFSET CEIL(array_upper($1, 1) / 2.0) - 1) sub; +$_$; + +\c - - - :worker_2_port +DROP FUNCTION IF EXISTS median(double precision[]); + +CREATE FUNCTION median(double precision[]) RETURNS double precision +LANGUAGE sql IMMUTABLE AS $_$ + SELECT AVG(val) FROM + (SELECT val FROM unnest($1) val + ORDER BY 1 LIMIT 2 - MOD(array_upper($1, 1), 2) + OFFSET CEIL(array_upper($1, 1) / 2.0) - 1) sub; +$_$; + +-- Run query on master +\c - - - :master_port + +SET citus.task_executor_type TO 'task-tracker'; + +SELECT * FROM (SELECT median(ARRAY[1,2,sum(l_suppkey)]) as median, count(*) + FROM lineitem GROUP BY l_partkey) AS a + WHERE median > 2;