diff --git a/src/backend/distributed/commands/aggregate.c b/src/backend/distributed/commands/aggregate.c new file mode 100644 index 000000000..4c25f41bf --- /dev/null +++ b/src/backend/distributed/commands/aggregate.c @@ -0,0 +1,107 @@ +/*------------------------------------------------------------------------- + * + * aggregate.c + * Commands for distributing AGGREGATE statements. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "distributed/commands.h" +#include "distributed/commands/utility_hook.h" +#include "distributed/deparser.h" +#include "distributed/listutils.h" +#include "distributed/metadata/dependency.h" +#include "distributed/metadata_sync.h" +#include "distributed/metadata/distobject.h" +#include "distributed/multi_executor.h" +#include "nodes/parsenodes.h" +#include "utils/lsyscache.h" + + +/* + * PostprocessDefineAggregateStmt actually creates the plan we need to execute for + * aggregate propagation. + * This is the downside of using the locally created aggregate to get the sql statement. + * + * If the aggregate depends on any non-distributed relation, Citus can not distribute it. + * In order to not to prevent users from creating local aggregates on the coordinator, + * a WARNING message will be sent to the user about the case instead of erroring out. + * + * Besides creating the plan we also make sure all (new) dependencies of the aggregate + * are created on all nodes. + */ +List * +PostprocessDefineAggregateStmt(Node *node, const char *queryString) +{ + QualifyTreeNode((Node *) node); + + DefineStmt *stmt = castNode(DefineStmt, node); + + if (!ShouldPropagate()) + { + return NIL; + } + + if (!ShouldPropagateCreateInCoordinatedTransction()) + { + return NIL; + } + + ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false); + + EnsureCoordinator(); + + EnsureSequentialMode(OBJECT_AGGREGATE); + + ObjectAddress *undistributableDependency = GetUndistributableDependency( + &address); + if (undistributableDependency != NULL) + { + if (SupportedDependencyByCitus(undistributableDependency)) + { + /* + * Citus can't distribute some relations as dependency, although those + * types as supported by Citus. So we can use get_rel_name directly + */ + RangeVar *aggRangeVar = makeRangeVarFromNameList(stmt->defnames); + char *aggName = aggRangeVar->relname; + char *dependentRelationName = + get_rel_name(undistributableDependency->objectId); + + ereport(WARNING, (errmsg("Citus can't distribute aggregate \"%s\" having " + "dependency on non-distributed relation \"%s\"", + aggName, dependentRelationName), + errdetail("Aggregate will be created only locally"), + errhint("To distribute aggregate, distribute dependent " + "relations first. Then, re-create the aggregate"))); + } + else + { + char *objectType = NULL; + #if PG_VERSION_NUM >= PG_VERSION_14 + objectType = getObjectTypeDescription(undistributableDependency, false); + #else + objectType = getObjectTypeDescription(undistributableDependency); + #endif + ereport(WARNING, (errmsg("Citus can't distribute functions having " + "dependency on unsupported object of type \"%s\"", + objectType), + errdetail("Aggregate will be created only locally"))); + } + + return NIL; + } + + EnsureDependenciesExistOnAllNodes(&address); + + List *commands = CreateFunctionDDLCommandsIdempotent(&address); + + commands = lcons(DISABLE_DDL_PROPAGATION, commands); + commands = lappend(commands, ENABLE_DDL_PROPAGATION); + + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); +} diff --git a/src/backend/distributed/commands/distribute_object_ops.c b/src/backend/distributed/commands/distribute_object_ops.c index f1aa1fa1e..80a2b6628 100644 --- a/src/backend/distributed/commands/distribute_object_ops.c +++ b/src/backend/distributed/commands/distribute_object_ops.c @@ -43,11 +43,11 @@ static DistributeObjectOps Aggregate_AlterOwner = { }; static DistributeObjectOps Aggregate_Define = { .deparse = NULL, - .qualify = NULL, + .qualify = QualifyDefineAggregateStmt, .preprocess = NULL, - .postprocess = NULL, + .postprocess = PostprocessDefineAggregateStmt, .address = DefineAggregateStmtObjectAddress, - .markDistributed = false, + .markDistributed = true, }; static DistributeObjectOps Aggregate_Drop = { .deparse = DeparseDropFunctionStmt, diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 80b870b62..46eb6a2a9 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -82,7 +82,6 @@ static void EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid static bool ShouldPropagateCreateFunction(CreateFunctionStmt *stmt); static bool ShouldPropagateAlterFunction(const ObjectAddress *address); static bool ShouldAddFunctionSignature(FunctionParameterMode mode); -static ObjectAddress * GetUndistributableDependency(ObjectAddress *functionAddress); static ObjectAddress FunctionToObjectAddress(ObjectType objectType, ObjectWithArgs *objectWithArgs, bool missing_ok); @@ -1352,7 +1351,7 @@ PostprocessCreateFunctionStmt(Node *node, const char *queryString) * GetUndistributableDependency checks whether object has any non-distributable * dependency. If any one found, it will be returned. */ -static ObjectAddress * +ObjectAddress * GetUndistributableDependency(ObjectAddress *objectAddress) { List *dependencies = GetAllDependenciesForObject(objectAddress); @@ -1443,10 +1442,18 @@ DefineAggregateStmtObjectAddress(Node *node, bool missing_ok) ObjectWithArgs *objectWithArgs = makeNode(ObjectWithArgs); objectWithArgs->objname = stmt->defnames; - FunctionParameter *funcParam = NULL; - foreach_ptr(funcParam, linitial(stmt->args)) + if (stmt->args != NIL) { - objectWithArgs->objargs = lappend(objectWithArgs->objargs, funcParam->argType); + FunctionParameter *funcParam = NULL; + foreach_ptr(funcParam, linitial(stmt->args)) + { + objectWithArgs->objargs = lappend(objectWithArgs->objargs, + funcParam->argType); + } + } + else + { + objectWithArgs->objargs = list_make1(makeTypeName("anyelement")); } return FunctionToObjectAddress(OBJECT_AGGREGATE, objectWithArgs, missing_ok); @@ -2019,10 +2026,10 @@ ShouldAddFunctionSignature(FunctionParameterMode mode) /* - * FunctionToObjectAddress returns the ObjectAddress of a Function or Procedure based on - * its type and ObjectWithArgs describing the Function/Procedure. If missing_ok is set to - * false an error will be raised by postgres explaining the Function/Procedure could not - * be found. + * FunctionToObjectAddress returns the ObjectAddress of a Function, Procedure or + * Aggregate based on its type and ObjectWithArgs describing the + * Function/Procedure/Aggregate. If missing_ok is set to false an error will be + * raised by postgres explaining the Function/Procedure could not be found. */ static ObjectAddress FunctionToObjectAddress(ObjectType objectType, ObjectWithArgs *objectWithArgs, diff --git a/src/backend/distributed/deparser/qualify_aggregate_stmts.c b/src/backend/distributed/deparser/qualify_aggregate_stmts.c new file mode 100644 index 000000000..9debc244a --- /dev/null +++ b/src/backend/distributed/deparser/qualify_aggregate_stmts.c @@ -0,0 +1,34 @@ +/*------------------------------------------------------------------------- + * + * qualify_aggregate_stmts.c + * Functions specialized in fully qualifying all aggregate statements. + * These functions are dispatched from qualify.c + * + * Fully qualifying aggregate statements consists of adding the schema name + * to the subject of the types as well as any other branch of the parsetree. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "catalog/namespace.h" +#include "distributed/deparser.h" +#include "nodes/makefuncs.h" +#include "utils/lsyscache.h" + +void +QualifyDefineAggregateStmt(Node *node) +{ + DefineStmt *stmt = castNode(DefineStmt, node); + + if (list_length(stmt->defnames) == 1) + { + char *objname = NULL; + Oid creationSchema = QualifiedNameGetCreationNamespace(stmt->defnames, &objname); + stmt->defnames = list_make2(makeString(get_namespace_name(creationSchema)), + linitial(stmt->defnames)); + } +} diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index f43235aac..a47dc6a48 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -748,6 +748,11 @@ GetObjectTypeString(ObjectType objType) { switch (objType) { + case OBJECT_AGGREGATE: + { + return "aggregate"; + } + case OBJECT_COLLATION: { return "collation"; diff --git a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/10.2-4.sql b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/10.2-4.sql index 2921de962..fa13dc7bd 100644 --- a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/10.2-4.sql +++ b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/10.2-4.sql @@ -12,15 +12,22 @@ BEGIN IF substring(current_Setting('server_version'), '\d+')::int >= 14 THEN EXECUTE $cmd$ + -- disable propagation to prevent EnsureCoordinator errors + -- the aggregate created here does not depend on Citus extension (yet) + -- since we add the dependency with the next command + SET citus.enable_ddl_propagation TO OFF; CREATE AGGREGATE array_cat_agg(anycompatiblearray) (SFUNC = array_cat, STYPE = anycompatiblearray); COMMENT ON AGGREGATE array_cat_agg(anycompatiblearray) IS 'concatenate input arrays into a single array'; + RESET citus.enable_ddl_propagation; $cmd$; ELSE EXECUTE $cmd$ + SET citus.enable_ddl_propagation TO OFF; CREATE AGGREGATE array_cat_agg(anyarray) (SFUNC = array_cat, STYPE = anyarray); COMMENT ON AGGREGATE array_cat_agg(anyarray) IS 'concatenate input arrays into a single array'; + RESET citus.enable_ddl_propagation; $cmd$; END IF; diff --git a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql index 2921de962..fa13dc7bd 100644 --- a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql @@ -12,15 +12,22 @@ BEGIN IF substring(current_Setting('server_version'), '\d+')::int >= 14 THEN EXECUTE $cmd$ + -- disable propagation to prevent EnsureCoordinator errors + -- the aggregate created here does not depend on Citus extension (yet) + -- since we add the dependency with the next command + SET citus.enable_ddl_propagation TO OFF; CREATE AGGREGATE array_cat_agg(anycompatiblearray) (SFUNC = array_cat, STYPE = anycompatiblearray); COMMENT ON AGGREGATE array_cat_agg(anycompatiblearray) IS 'concatenate input arrays into a single array'; + RESET citus.enable_ddl_propagation; $cmd$; ELSE EXECUTE $cmd$ + SET citus.enable_ddl_propagation TO OFF; CREATE AGGREGATE array_cat_agg(anyarray) (SFUNC = array_cat, STYPE = anyarray); COMMENT ON AGGREGATE array_cat_agg(anyarray) IS 'concatenate input arrays into a single array'; + RESET citus.enable_ddl_propagation; $cmd$; END IF; diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index c2bf66d5b..8ec3d9e8a 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -122,6 +122,9 @@ typedef enum SearchForeignKeyColumnFlags } SearchForeignKeyColumnFlags; +/* aggregate.c - forward declarations */ +extern List * PostprocessDefineAggregateStmt(Node *node, const char *queryString); + /* cluster.c - forward declarations */ extern List * PreprocessClusterStmt(Node *node, const char *clusterCommand, ProcessUtilityContext processUtilityContext); @@ -264,6 +267,7 @@ extern List * PreprocessCreateFunctionStmt(Node *stmt, const char *queryString, ProcessUtilityContext processUtilityContext); extern List * PostprocessCreateFunctionStmt(Node *stmt, const char *queryString); +extern ObjectAddress * GetUndistributableDependency(ObjectAddress *functionAddress); extern ObjectAddress CreateFunctionStmtObjectAddress(Node *stmt, bool missing_ok); extern ObjectAddress DefineAggregateStmtObjectAddress(Node *stmt, diff --git a/src/include/distributed/deparser.h b/src/include/distributed/deparser.h index ebf4a6147..e3b02cdfc 100644 --- a/src/include/distributed/deparser.h +++ b/src/include/distributed/deparser.h @@ -33,6 +33,9 @@ extern void QualifyTreeNode(Node *stmt); extern char * DeparseTreeNode(Node *stmt); extern List * DeparseTreeNodes(List *stmts); +/* forward declarations for qualify_aggregate_stmts.c */ +extern void QualifyDefineAggregateStmt(Node *node); + /* forward declarations for deparse_attribute_stmts.c */ extern char * DeparseRenameAttributeStmt(Node *); diff --git a/src/test/regress/expected/aggregate_support.out b/src/test/regress/expected/aggregate_support.out index bf35d7f4b..57bb03060 100644 --- a/src/test/regress/expected/aggregate_support.out +++ b/src/test/regress/expected/aggregate_support.out @@ -282,6 +282,21 @@ create aggregate binstragg(text, text)( combinefunc=binstragg_combinefunc, stype=text ); +-- verify that the aggregate is added into pg_dist_object, on each worker +SELECT run_command_on_workers($$SELECT count(*) from citus.pg_dist_object where objid = 'aggregate_support.binstragg'::regproc;$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,1) + (localhost,57638,t,1) +(2 rows) + +SELECT run_command_on_workers($$select count(*) from pg_aggregate where aggfnoid::text like '%binstragg%';$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,1) + (localhost,57638,t,1) +(2 rows) + select create_distributed_function('binstragg(text,text)'); create_distributed_function --------------------------------------------------------------------- @@ -549,7 +564,6 @@ SELECT id%5, first(val ORDER BY key), last(val ORDER BY key) FROM aggdata GROUP BY id%5 ORDER BY id%5; ERROR: unsupported aggregate function first -- test aggregate with stype which is not a by-value datum --- also test our handling of the aggregate not existing on workers create function sumstring_sfunc(state text, x text) returns text immutable language plpgsql as $$ begin return (state::float8 + x::float8)::text; @@ -561,9 +575,20 @@ create aggregate sumstring(text) ( combinefunc = sumstring_sfunc, initcond = '0' ); -select sumstring(valf::text) from aggdata where valf is not null; -ERROR: function "aggregate_support.sumstring(text)" does not exist -CONTEXT: while executing command on localhost:xxxxx +-- verify that the aggregate is propagated +select aggfnoid from pg_aggregate where aggfnoid::text like '%sumstring%'; + aggfnoid +--------------------------------------------------------------------- + sumstring +(1 row) + +SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%sumstring%';$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,aggregate_support.sumstring) + (localhost,57638,t,aggregate_support.sumstring) +(2 rows) + select create_distributed_function('sumstring(text)'); create_distributed_function --------------------------------------------------------------------- @@ -1042,5 +1067,122 @@ LEFT JOIN ref_table ON TRUE; 109 (1 row) +-- try createing aggregate having non-distributable dependency type +create table dummy_tbl (a int); +create function dummy_fnc(a dummy_tbl, d double precision) RETURNS dummy_tbl + AS $$SELECT 1;$$ LANGUAGE sql; +WARNING: Citus can't distribute function "dummy_fnc" having dependency on non-distributed relation "dummy_tbl" +DETAIL: Function will be created only locally +HINT: To distribute function, distribute dependent relations first. Then, re-create the function +-- should give warning and create aggregate local only +create aggregate dependent_agg (float8) (stype=dummy_tbl, sfunc=dummy_fnc); +WARNING: Citus can't distribute aggregate "dependent_agg" having dependency on non-distributed relation "dummy_tbl" +DETAIL: Aggregate will be created only locally +HINT: To distribute aggregate, distribute dependent relations first. Then, re-create the aggregate +-- clear and try again with distributed table +DROP TABLE dummy_tbl CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to function dummy_fnc(dummy_tbl,double precision) +drop cascades to function dependent_agg(double precision) +create table dummy_tbl (a int); +SELECT create_distributed_table('dummy_tbl','a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +create function dummy_fnc(a dummy_tbl, d double precision) RETURNS dummy_tbl + AS $$SELECT 1;$$ LANGUAGE sql; +-- test in tx block +-- shouldn't distribute, as citus.create_object_propagation is set to deferred +BEGIN; +create aggregate dependent_agg (float8) (stype=dummy_tbl, sfunc=dummy_fnc); +COMMIT; +-- verify not distributed +SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%dependent_agg%';$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,"") + (localhost,57638,t,"") +(2 rows) + +drop aggregate dependent_agg ( double precision); +-- now try with create_object_propagation = immediate +SET citus.create_object_propagation TO immediate; +-- should distribute, as citus.create_object_propagation is set to immediate +-- will switch to sequential mode +BEGIN; +create aggregate dependent_agg (float8) (stype=dummy_tbl, sfunc=dummy_fnc); +COMMIT; +-- verify distributed +SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%dependent_agg%';$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,aggregate_support.dependent_agg) + (localhost,57638,t,aggregate_support.dependent_agg) +(2 rows) + +drop aggregate dependent_agg ( double precision); +-- now try with create_object_propagation = automatic +SET citus.create_object_propagation TO automatic; +-- should distribute, as citus.create_object_propagation is set to automatic +-- will switch to sequential mode +BEGIN; +create aggregate dependent_agg (float8) (stype=dummy_tbl, sfunc=dummy_fnc); +COMMIT; +-- verify distributed +SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%dependent_agg%';$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,aggregate_support.dependent_agg) + (localhost,57638,t,aggregate_support.dependent_agg) +(2 rows) + +-- verify that the aggregate is added into pg_dist_object, on each worker +SELECT run_command_on_workers($$SELECT count(*) from citus.pg_dist_object where objid = 'aggregate_support.dependent_agg'::regproc;$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,1) + (localhost,57638,t,1) +(2 rows) + +RESET citus.create_object_propagation; +-- drop and test outside of tx block +drop aggregate dependent_agg (float8); +-- verify that the aggregate is removed from pg_dist_object, on each worker +SELECT run_command_on_workers($$SELECT count(*) from citus.pg_dist_object where objid = 'aggregate_support.dependent_agg'::regproc;$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,f,"ERROR: function ""aggregate_support.dependent_agg"" does not exist") + (localhost,57638,f,"ERROR: function ""aggregate_support.dependent_agg"" does not exist") +(2 rows) + +create aggregate dependent_agg (float8) (stype=dummy_tbl, sfunc=dummy_fnc); +--verify +SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%dependent_agg%';$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,aggregate_support.dependent_agg) + (localhost,57638,t,aggregate_support.dependent_agg) +(2 rows) + +DROP TABLE dummy_tbl CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to function dummy_fnc(dummy_tbl,double precision) +drop cascades to function dependent_agg(double precision) +SET citus.create_object_propagation TO automatic; +begin; + create type typ1 as (a int); + create or replace function fnagg(a typ1, d double precision) RETURNS typ1 AS $$SELECT 1;$$LANGUAGE sql; + create aggregate dependent_agg (float8) (stype=typ1, sfunc=fnagg); +commit; +RESET citus.create_object_propagation; +SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%dependent_agg%';$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,aggregate_support.dependent_agg) + (localhost,57638,t,aggregate_support.dependent_agg) +(2 rows) + set client_min_messages to error; drop schema aggregate_support cascade; diff --git a/src/test/regress/expected/propagate_foreign_servers.out b/src/test/regress/expected/propagate_foreign_servers.out index 2e9b164e7..bebb5a4f6 100644 --- a/src/test/regress/expected/propagate_foreign_servers.out +++ b/src/test/regress/expected/propagate_foreign_servers.out @@ -7,6 +7,15 @@ SELECT citus_remove_node('localhost', :worker_1_port); (1 row) +-- not related, but added here to test propagation of aggregates +-- to newly added nodes +CREATE AGGREGATE array_agg (anynonarray) +( + sfunc = array_agg_transfn, + stype = internal, + finalfunc = array_agg_finalfn, + finalfunc_extra +); -- create schema, extension and foreign server while the worker is removed SET citus.enable_ddl_propagation TO OFF; CREATE SCHEMA test_dependent_schema; @@ -28,6 +37,20 @@ NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipp 1 (1 row) +-- verify that the aggregate is propagated to the new node +SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%propagate_foreign_server.array_agg%';$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57638,t,propagate_foreign_server.array_agg) +(1 row) + +-- verify that the aggregate is added top pg_dist_object on the new node +SELECT run_command_on_workers($$SELECT count(*) from citus.pg_dist_object where objid = 'propagate_foreign_server.array_agg'::regproc;$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57638,t,1) +(1 row) + SELECT citus_add_local_table_to_metadata('foreign_table'); citus_add_local_table_to_metadata --------------------------------------------------------------------- @@ -163,4 +186,6 @@ ERROR: operation is not allowed on this node HINT: Connect to the coordinator and run it again. \c - - - :master_port DROP SCHEMA propagate_foreign_server CASCADE; -NOTICE: drop cascades to extension postgres_fdw +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to function propagate_foreign_server.array_agg(anynonarray) +drop cascades to extension postgres_fdw diff --git a/src/test/regress/sql/aggregate_support.sql b/src/test/regress/sql/aggregate_support.sql index 7c82418a7..a83688d82 100644 --- a/src/test/regress/sql/aggregate_support.sql +++ b/src/test/regress/sql/aggregate_support.sql @@ -160,6 +160,10 @@ create aggregate binstragg(text, text)( combinefunc=binstragg_combinefunc, stype=text ); +-- verify that the aggregate is added into pg_dist_object, on each worker +SELECT run_command_on_workers($$SELECT count(*) from citus.pg_dist_object where objid = 'aggregate_support.binstragg'::regproc;$$); + +SELECT run_command_on_workers($$select count(*) from pg_aggregate where aggfnoid::text like '%binstragg%';$$); select create_distributed_function('binstragg(text,text)'); @@ -313,7 +317,6 @@ SELECT id%5, first(val ORDER BY key), last(val ORDER BY key) FROM aggdata GROUP BY id%5 ORDER BY id%5; -- test aggregate with stype which is not a by-value datum --- also test our handling of the aggregate not existing on workers create function sumstring_sfunc(state text, x text) returns text immutable language plpgsql as $$ begin return (state::float8 + x::float8)::text; @@ -326,8 +329,10 @@ create aggregate sumstring(text) ( combinefunc = sumstring_sfunc, initcond = '0' ); +-- verify that the aggregate is propagated +select aggfnoid from pg_aggregate where aggfnoid::text like '%sumstring%'; +SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%sumstring%';$$); -select sumstring(valf::text) from aggdata where valf is not null; select create_distributed_function('sumstring(text)'); select sumstring(valf::text) from aggdata where valf is not null; @@ -533,5 +538,79 @@ SELECT floor(AVG(COALESCE(agg_col, 10))) FROM dist_table LEFT JOIN ref_table ON TRUE; +-- try createing aggregate having non-distributable dependency type +create table dummy_tbl (a int); +create function dummy_fnc(a dummy_tbl, d double precision) RETURNS dummy_tbl + AS $$SELECT 1;$$ LANGUAGE sql; +-- should give warning and create aggregate local only +create aggregate dependent_agg (float8) (stype=dummy_tbl, sfunc=dummy_fnc); + +-- clear and try again with distributed table +DROP TABLE dummy_tbl CASCADE; + +create table dummy_tbl (a int); +SELECT create_distributed_table('dummy_tbl','a'); +create function dummy_fnc(a dummy_tbl, d double precision) RETURNS dummy_tbl + AS $$SELECT 1;$$ LANGUAGE sql; + +-- test in tx block +-- shouldn't distribute, as citus.create_object_propagation is set to deferred +BEGIN; +create aggregate dependent_agg (float8) (stype=dummy_tbl, sfunc=dummy_fnc); +COMMIT; +-- verify not distributed +SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%dependent_agg%';$$); + +drop aggregate dependent_agg ( double precision); + +-- now try with create_object_propagation = immediate +SET citus.create_object_propagation TO immediate; +-- should distribute, as citus.create_object_propagation is set to immediate +-- will switch to sequential mode +BEGIN; +create aggregate dependent_agg (float8) (stype=dummy_tbl, sfunc=dummy_fnc); +COMMIT; + +-- verify distributed +SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%dependent_agg%';$$); + +drop aggregate dependent_agg ( double precision); + +-- now try with create_object_propagation = automatic +SET citus.create_object_propagation TO automatic; +-- should distribute, as citus.create_object_propagation is set to automatic +-- will switch to sequential mode +BEGIN; +create aggregate dependent_agg (float8) (stype=dummy_tbl, sfunc=dummy_fnc); +COMMIT; + +-- verify distributed +SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%dependent_agg%';$$); + +-- verify that the aggregate is added into pg_dist_object, on each worker +SELECT run_command_on_workers($$SELECT count(*) from citus.pg_dist_object where objid = 'aggregate_support.dependent_agg'::regproc;$$); + +RESET citus.create_object_propagation; + +-- drop and test outside of tx block +drop aggregate dependent_agg (float8); +-- verify that the aggregate is removed from pg_dist_object, on each worker +SELECT run_command_on_workers($$SELECT count(*) from citus.pg_dist_object where objid = 'aggregate_support.dependent_agg'::regproc;$$); +create aggregate dependent_agg (float8) (stype=dummy_tbl, sfunc=dummy_fnc); +--verify +SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%dependent_agg%';$$); + +DROP TABLE dummy_tbl CASCADE; + +SET citus.create_object_propagation TO automatic; +begin; + create type typ1 as (a int); + create or replace function fnagg(a typ1, d double precision) RETURNS typ1 AS $$SELECT 1;$$LANGUAGE sql; + create aggregate dependent_agg (float8) (stype=typ1, sfunc=fnagg); +commit; +RESET citus.create_object_propagation; + +SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%dependent_agg%';$$); + set client_min_messages to error; drop schema aggregate_support cascade; diff --git a/src/test/regress/sql/propagate_foreign_servers.sql b/src/test/regress/sql/propagate_foreign_servers.sql index f27a10e5d..cd64f7c9b 100644 --- a/src/test/regress/sql/propagate_foreign_servers.sql +++ b/src/test/regress/sql/propagate_foreign_servers.sql @@ -4,6 +4,16 @@ SET search_path TO propagate_foreign_server; -- remove node to add later SELECT citus_remove_node('localhost', :worker_1_port); +-- not related, but added here to test propagation of aggregates +-- to newly added nodes +CREATE AGGREGATE array_agg (anynonarray) +( + sfunc = array_agg_transfn, + stype = internal, + finalfunc = array_agg_finalfn, + finalfunc_extra +); + -- create schema, extension and foreign server while the worker is removed SET citus.enable_ddl_propagation TO OFF; CREATE SCHEMA test_dependent_schema; @@ -20,6 +30,13 @@ CREATE FOREIGN TABLE foreign_table ( OPTIONS (schema_name 'test_dependent_schema', table_name 'foreign_table_test'); SELECT 1 FROM citus_add_node('localhost', :master_port, groupId=>0); + +-- verify that the aggregate is propagated to the new node +SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%propagate_foreign_server.array_agg%';$$); + +-- verify that the aggregate is added top pg_dist_object on the new node +SELECT run_command_on_workers($$SELECT count(*) from citus.pg_dist_object where objid = 'propagate_foreign_server.array_agg'::regproc;$$); + SELECT citus_add_local_table_to_metadata('foreign_table'); ALTER TABLE foreign_table OWNER TO pg_monitor;