diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 3c82bfb47..4d12bda38 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -3119,14 +3119,14 @@ AggregateFunctionOid(const char *functionName, Oid inputType) /* - * AggregateFunctionOidWithoutInput performs a reverse lookup on aggregate function name, - * and returns the corresponding aggregate function oid for the given function - * name and input type. + * CitusFunctionOidWithSignature looks up a function with given input types. + * Looks in pg_catalog schema, as this function's sole purpose is + * support aggregate lookup. */ static Oid CitusFunctionOidWithSignature(char *functionName, int numargs, Oid *argtypes) { - List *aggregateName = list_make2(makeString("citus"), makeString(functionName)); + List *aggregateName = list_make2(makeString("pg_catalog"), makeString(functionName)); FuncCandidateList clist = FuncnameGetCandidates(aggregateName, numargs, NIL, false, false, true); @@ -3144,7 +3144,7 @@ CitusFunctionOidWithSignature(char *functionName, int numargs, Oid *argtypes) /* - * Lookup oid of citus.worker_partial_agg + * WorkerPartialAggOid looks up oid of pg_catalog.worker_partial_agg */ static Oid WorkerPartialAggOid() @@ -3159,7 +3159,7 @@ WorkerPartialAggOid() /* - * Lookup oid of citus.coord_combine_agg + * CoordCombineAggOid looks up oid of pg_catalog.coord_combine_agg */ static Oid CoordCombineAggOid() diff --git a/src/backend/distributed/sql/citus--9.0-2--9.1-1.sql b/src/backend/distributed/sql/citus--9.0-2--9.1-1.sql index 743b9a42d..df7a4418f 100644 --- a/src/backend/distributed/sql/citus--9.0-2--9.1-1.sql +++ b/src/backend/distributed/sql/citus--9.0-2--9.1-1.sql @@ -21,53 +21,68 @@ UPDATE pg_dist_colocation SET replicationfactor = -1 WHERE distributioncolumntyp DROP FUNCTION IF EXISTS pg_catalog.master_initialize_node_metadata; -- Support infrastructure for distributing aggregation -CREATE FUNCTION citus.worker_partial_agg_sfunc(internal, oid, anyelement) +CREATE FUNCTION pg_catalog.worker_partial_agg_sfunc(internal, oid, anyelement) RETURNS internal AS 'MODULE_PATHNAME' LANGUAGE C PARALLEL SAFE; -COMMENT ON FUNCTION citus.worker_partial_agg_sfunc(internal, oid, anyelement) +COMMENT ON FUNCTION pg_catalog.worker_partial_agg_sfunc(internal, oid, anyelement) IS 'transition function for worker_partial_agg'; -CREATE FUNCTION citus.worker_partial_agg_ffunc(internal) +CREATE FUNCTION pg_catalog.worker_partial_agg_ffunc(internal) RETURNS cstring AS 'MODULE_PATHNAME' LANGUAGE C PARALLEL SAFE; -COMMENT ON FUNCTION citus.worker_partial_agg_ffunc(internal) +COMMENT ON FUNCTION pg_catalog.worker_partial_agg_ffunc(internal) IS 'finalizer for worker_partial_agg'; -CREATE FUNCTION citus.coord_combine_agg_sfunc(internal, oid, cstring, anyelement) +CREATE FUNCTION pg_catalog.coord_combine_agg_sfunc(internal, oid, cstring, anyelement) RETURNS internal AS 'MODULE_PATHNAME' LANGUAGE C PARALLEL SAFE; -COMMENT ON FUNCTION citus.coord_combine_agg_sfunc(internal, oid, cstring, anyelement) +COMMENT ON FUNCTION pg_catalog.coord_combine_agg_sfunc(internal, oid, cstring, anyelement) IS 'transition function for coord_combine_agg'; -CREATE FUNCTION citus.coord_combine_agg_ffunc(internal, oid, cstring, anyelement) +CREATE FUNCTION pg_catalog.coord_combine_agg_ffunc(internal, oid, cstring, anyelement) RETURNS anyelement AS 'MODULE_PATHNAME' LANGUAGE C PARALLEL SAFE; -COMMENT ON FUNCTION citus.coord_combine_agg_ffunc(internal, oid, cstring, anyelement) +COMMENT ON FUNCTION pg_catalog.coord_combine_agg_ffunc(internal, oid, cstring, anyelement) IS 'finalizer for coord_combine_agg'; -- select worker_partial_agg(agg, ...) -- equivalent to -- select to_cstring(agg_without_ffunc(...)) -CREATE AGGREGATE citus.worker_partial_agg(oid, anyelement) ( +CREATE AGGREGATE pg_catalog.worker_partial_agg(oid, anyelement) ( STYPE = internal, - SFUNC = citus.worker_partial_agg_sfunc, - FINALFUNC = citus.worker_partial_agg_ffunc + SFUNC = pg_catalog.worker_partial_agg_sfunc, + FINALFUNC = pg_catalog.worker_partial_agg_ffunc ); -COMMENT ON AGGREGATE citus.worker_partial_agg(oid, anyelement) +COMMENT ON AGGREGATE pg_catalog.worker_partial_agg(oid, anyelement) IS 'support aggregate for implementing partial aggregation on workers'; -- select coord_combine_agg(agg, col) -- equivalent to -- select agg_ffunc(agg_combine(from_cstring(col))) -CREATE AGGREGATE citus.coord_combine_agg(oid, cstring, anyelement) ( +CREATE AGGREGATE pg_catalog.coord_combine_agg(oid, cstring, anyelement) ( STYPE = internal, - SFUNC = citus.coord_combine_agg_sfunc, - FINALFUNC = citus.coord_combine_agg_ffunc, + SFUNC = pg_catalog.coord_combine_agg_sfunc, + FINALFUNC = pg_catalog.coord_combine_agg_ffunc, FINALFUNC_EXTRA ); -COMMENT ON AGGREGATE citus.coord_combine_agg(oid, cstring, anyelement) +COMMENT ON AGGREGATE pg_catalog.coord_combine_agg(oid, cstring, anyelement) IS 'support aggregate for implementing combining partial aggregate results from workers'; + +REVOKE ALL ON FUNCTION pg_catalog.worker_partial_agg_ffunc FROM PUBLIC; +REVOKE ALL ON FUNCTION pg_catalog.worker_partial_agg_sfunc FROM PUBLIC; +REVOKE ALL ON FUNCTION pg_catalog.coord_combine_agg_ffunc FROM PUBLIC; +REVOKE ALL ON FUNCTION pg_catalog.coord_combine_agg_sfunc FROM PUBLIC; +REVOKE ALL ON FUNCTION pg_catalog.worker_partial_agg FROM PUBLIC; +REVOKE ALL ON FUNCTION pg_catalog.coord_combine_agg FROM PUBLIC; + +GRANT EXECUTE ON FUNCTION pg_catalog.worker_partial_agg_ffunc TO PUBLIC; +GRANT EXECUTE ON FUNCTION pg_catalog.worker_partial_agg_sfunc TO PUBLIC; +GRANT EXECUTE ON FUNCTION pg_catalog.coord_combine_agg_ffunc TO PUBLIC; +GRANT EXECUTE ON FUNCTION pg_catalog.coord_combine_agg_sfunc TO PUBLIC; +GRANT EXECUTE ON FUNCTION pg_catalog.worker_partial_agg TO PUBLIC; +GRANT EXECUTE ON FUNCTION pg_catalog.coord_combine_agg TO PUBLIC; + diff --git a/src/test/regress/expected/aggregate_support.out b/src/test/regress/expected/aggregate_support.out index e2fc5d72f..eddb3e0d2 100644 --- a/src/test/regress/expected/aggregate_support.out +++ b/src/test/regress/expected/aggregate_support.out @@ -162,5 +162,30 @@ select array_collect_sort(val) from aggdata; {0,2,2,3,4,5,8,NULL,NULL,NULL,NULL} (1 row) +-- Test multiuser scenario +create user notsuper; +NOTICE: not propagating CREATE ROLE/USER commands to worker nodes +HINT: Connect to worker nodes directly to manually create all necessary users and roles. +grant all on schema aggregate_support to notsuper; +grant all on all tables in schema aggregate_support to notsuper; +select run_command_on_workers($$ +create user notsuper; +grant all on schema aggregate_support to notsuper; +grant all on all tables in schema aggregate_support to notsuper; +$$); + run_command_on_workers +----------------------------------- + (localhost,57637,t,"CREATE ROLE") + (localhost,57638,t,"CREATE ROLE") +(2 rows) + +set role notsuper; +select array_collect_sort(val) from aggdata; + array_collect_sort +------------------------------------- + {0,2,2,3,4,5,8,NULL,NULL,NULL,NULL} +(1 row) + +reset role; set client_min_messages to error; drop schema aggregate_support cascade; diff --git a/src/test/regress/sql/aggregate_support.sql b/src/test/regress/sql/aggregate_support.sql index 08c56f95c..700dd53a1 100644 --- a/src/test/regress/sql/aggregate_support.sql +++ b/src/test/regress/sql/aggregate_support.sql @@ -125,5 +125,18 @@ select create_distributed_function('array_collect_sort(int)'); select array_collect_sort(val) from aggdata; +-- Test multiuser scenario +create user notsuper; +grant all on schema aggregate_support to notsuper; +grant all on all tables in schema aggregate_support to notsuper; +select run_command_on_workers($$ +create user notsuper; +grant all on schema aggregate_support to notsuper; +grant all on all tables in schema aggregate_support to notsuper; +$$); +set role notsuper; +select array_collect_sort(val) from aggdata; +reset role; + set client_min_messages to error; drop schema aggregate_support cascade;