Fix distributed aggregation for non superuser roles

Moves support functions to pg_catalog for now. We'd prefer a different solution
for when we're creating these support functions dynamically
pull/3221/head
Philip Dubé 2019-11-25 20:05:28 +00:00 committed by Philip Dubé
parent f81785ad14
commit a81e6a81ab
4 changed files with 75 additions and 22 deletions

View File

@ -3119,14 +3119,14 @@ AggregateFunctionOid(const char *functionName, Oid inputType)
/* /*
* AggregateFunctionOidWithoutInput performs a reverse lookup on aggregate function name, * CitusFunctionOidWithSignature looks up a function with given input types.
* and returns the corresponding aggregate function oid for the given function * Looks in pg_catalog schema, as this function's sole purpose is
* name and input type. * support aggregate lookup.
*/ */
static Oid static Oid
CitusFunctionOidWithSignature(char *functionName, int numargs, Oid *argtypes) CitusFunctionOidWithSignature(char *functionName, int numargs, Oid *argtypes)
{ {
List *aggregateName = list_make2(makeString("citus"), makeString(functionName)); List *aggregateName = list_make2(makeString("pg_catalog"), makeString(functionName));
FuncCandidateList clist = FuncnameGetCandidates(aggregateName, numargs, NIL, false, FuncCandidateList clist = FuncnameGetCandidates(aggregateName, numargs, NIL, false,
false, true); false, true);
@ -3144,7 +3144,7 @@ CitusFunctionOidWithSignature(char *functionName, int numargs, Oid *argtypes)
/* /*
* Lookup oid of citus.worker_partial_agg * WorkerPartialAggOid looks up oid of pg_catalog.worker_partial_agg
*/ */
static Oid static Oid
WorkerPartialAggOid() WorkerPartialAggOid()
@ -3159,7 +3159,7 @@ WorkerPartialAggOid()
/* /*
* Lookup oid of citus.coord_combine_agg * CoordCombineAggOid looks up oid of pg_catalog.coord_combine_agg
*/ */
static Oid static Oid
CoordCombineAggOid() CoordCombineAggOid()

View File

@ -21,53 +21,68 @@ UPDATE pg_dist_colocation SET replicationfactor = -1 WHERE distributioncolumntyp
DROP FUNCTION IF EXISTS pg_catalog.master_initialize_node_metadata; DROP FUNCTION IF EXISTS pg_catalog.master_initialize_node_metadata;
-- Support infrastructure for distributing aggregation -- Support infrastructure for distributing aggregation
CREATE FUNCTION citus.worker_partial_agg_sfunc(internal, oid, anyelement) CREATE FUNCTION pg_catalog.worker_partial_agg_sfunc(internal, oid, anyelement)
RETURNS internal RETURNS internal
AS 'MODULE_PATHNAME' AS 'MODULE_PATHNAME'
LANGUAGE C PARALLEL SAFE; LANGUAGE C PARALLEL SAFE;
COMMENT ON FUNCTION citus.worker_partial_agg_sfunc(internal, oid, anyelement) COMMENT ON FUNCTION pg_catalog.worker_partial_agg_sfunc(internal, oid, anyelement)
IS 'transition function for worker_partial_agg'; IS 'transition function for worker_partial_agg';
CREATE FUNCTION citus.worker_partial_agg_ffunc(internal) CREATE FUNCTION pg_catalog.worker_partial_agg_ffunc(internal)
RETURNS cstring RETURNS cstring
AS 'MODULE_PATHNAME' AS 'MODULE_PATHNAME'
LANGUAGE C PARALLEL SAFE; LANGUAGE C PARALLEL SAFE;
COMMENT ON FUNCTION citus.worker_partial_agg_ffunc(internal) COMMENT ON FUNCTION pg_catalog.worker_partial_agg_ffunc(internal)
IS 'finalizer for worker_partial_agg'; IS 'finalizer for worker_partial_agg';
CREATE FUNCTION citus.coord_combine_agg_sfunc(internal, oid, cstring, anyelement) CREATE FUNCTION pg_catalog.coord_combine_agg_sfunc(internal, oid, cstring, anyelement)
RETURNS internal RETURNS internal
AS 'MODULE_PATHNAME' AS 'MODULE_PATHNAME'
LANGUAGE C PARALLEL SAFE; LANGUAGE C PARALLEL SAFE;
COMMENT ON FUNCTION citus.coord_combine_agg_sfunc(internal, oid, cstring, anyelement) COMMENT ON FUNCTION pg_catalog.coord_combine_agg_sfunc(internal, oid, cstring, anyelement)
IS 'transition function for coord_combine_agg'; IS 'transition function for coord_combine_agg';
CREATE FUNCTION citus.coord_combine_agg_ffunc(internal, oid, cstring, anyelement) CREATE FUNCTION pg_catalog.coord_combine_agg_ffunc(internal, oid, cstring, anyelement)
RETURNS anyelement RETURNS anyelement
AS 'MODULE_PATHNAME' AS 'MODULE_PATHNAME'
LANGUAGE C PARALLEL SAFE; LANGUAGE C PARALLEL SAFE;
COMMENT ON FUNCTION citus.coord_combine_agg_ffunc(internal, oid, cstring, anyelement) COMMENT ON FUNCTION pg_catalog.coord_combine_agg_ffunc(internal, oid, cstring, anyelement)
IS 'finalizer for coord_combine_agg'; IS 'finalizer for coord_combine_agg';
-- select worker_partial_agg(agg, ...) -- select worker_partial_agg(agg, ...)
-- equivalent to -- equivalent to
-- select to_cstring(agg_without_ffunc(...)) -- select to_cstring(agg_without_ffunc(...))
CREATE AGGREGATE citus.worker_partial_agg(oid, anyelement) ( CREATE AGGREGATE pg_catalog.worker_partial_agg(oid, anyelement) (
STYPE = internal, STYPE = internal,
SFUNC = citus.worker_partial_agg_sfunc, SFUNC = pg_catalog.worker_partial_agg_sfunc,
FINALFUNC = citus.worker_partial_agg_ffunc FINALFUNC = pg_catalog.worker_partial_agg_ffunc
); );
COMMENT ON AGGREGATE citus.worker_partial_agg(oid, anyelement) COMMENT ON AGGREGATE pg_catalog.worker_partial_agg(oid, anyelement)
IS 'support aggregate for implementing partial aggregation on workers'; IS 'support aggregate for implementing partial aggregation on workers';
-- select coord_combine_agg(agg, col) -- select coord_combine_agg(agg, col)
-- equivalent to -- equivalent to
-- select agg_ffunc(agg_combine(from_cstring(col))) -- select agg_ffunc(agg_combine(from_cstring(col)))
CREATE AGGREGATE citus.coord_combine_agg(oid, cstring, anyelement) ( CREATE AGGREGATE pg_catalog.coord_combine_agg(oid, cstring, anyelement) (
STYPE = internal, STYPE = internal,
SFUNC = citus.coord_combine_agg_sfunc, SFUNC = pg_catalog.coord_combine_agg_sfunc,
FINALFUNC = citus.coord_combine_agg_ffunc, FINALFUNC = pg_catalog.coord_combine_agg_ffunc,
FINALFUNC_EXTRA FINALFUNC_EXTRA
); );
COMMENT ON AGGREGATE citus.coord_combine_agg(oid, cstring, anyelement) COMMENT ON AGGREGATE pg_catalog.coord_combine_agg(oid, cstring, anyelement)
IS 'support aggregate for implementing combining partial aggregate results from workers'; IS 'support aggregate for implementing combining partial aggregate results from workers';
REVOKE ALL ON FUNCTION pg_catalog.worker_partial_agg_ffunc FROM PUBLIC;
REVOKE ALL ON FUNCTION pg_catalog.worker_partial_agg_sfunc FROM PUBLIC;
REVOKE ALL ON FUNCTION pg_catalog.coord_combine_agg_ffunc FROM PUBLIC;
REVOKE ALL ON FUNCTION pg_catalog.coord_combine_agg_sfunc FROM PUBLIC;
REVOKE ALL ON FUNCTION pg_catalog.worker_partial_agg FROM PUBLIC;
REVOKE ALL ON FUNCTION pg_catalog.coord_combine_agg FROM PUBLIC;
GRANT EXECUTE ON FUNCTION pg_catalog.worker_partial_agg_ffunc TO PUBLIC;
GRANT EXECUTE ON FUNCTION pg_catalog.worker_partial_agg_sfunc TO PUBLIC;
GRANT EXECUTE ON FUNCTION pg_catalog.coord_combine_agg_ffunc TO PUBLIC;
GRANT EXECUTE ON FUNCTION pg_catalog.coord_combine_agg_sfunc TO PUBLIC;
GRANT EXECUTE ON FUNCTION pg_catalog.worker_partial_agg TO PUBLIC;
GRANT EXECUTE ON FUNCTION pg_catalog.coord_combine_agg TO PUBLIC;

View File

@ -162,5 +162,30 @@ select array_collect_sort(val) from aggdata;
{0,2,2,3,4,5,8,NULL,NULL,NULL,NULL} {0,2,2,3,4,5,8,NULL,NULL,NULL,NULL}
(1 row) (1 row)
-- Test multiuser scenario
create user notsuper;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles.
grant all on schema aggregate_support to notsuper;
grant all on all tables in schema aggregate_support to notsuper;
select run_command_on_workers($$
create user notsuper;
grant all on schema aggregate_support to notsuper;
grant all on all tables in schema aggregate_support to notsuper;
$$);
run_command_on_workers
-----------------------------------
(localhost,57637,t,"CREATE ROLE")
(localhost,57638,t,"CREATE ROLE")
(2 rows)
set role notsuper;
select array_collect_sort(val) from aggdata;
array_collect_sort
-------------------------------------
{0,2,2,3,4,5,8,NULL,NULL,NULL,NULL}
(1 row)
reset role;
set client_min_messages to error; set client_min_messages to error;
drop schema aggregate_support cascade; drop schema aggregate_support cascade;

View File

@ -125,5 +125,18 @@ select create_distributed_function('array_collect_sort(int)');
select array_collect_sort(val) from aggdata; select array_collect_sort(val) from aggdata;
-- Test multiuser scenario
create user notsuper;
grant all on schema aggregate_support to notsuper;
grant all on all tables in schema aggregate_support to notsuper;
select run_command_on_workers($$
create user notsuper;
grant all on schema aggregate_support to notsuper;
grant all on all tables in schema aggregate_support to notsuper;
$$);
set role notsuper;
select array_collect_sort(val) from aggdata;
reset role;
set client_min_messages to error; set client_min_messages to error;
drop schema aggregate_support cascade; drop schema aggregate_support cascade;