Propagate CREATE AGGREGATE commands

pull/5735/head
Ahmet Gedemenli 2022-02-23 13:17:52 +03:00
parent 1e876abc56
commit e1809af376
13 changed files with 456 additions and 19 deletions

View File

@ -0,0 +1,107 @@
/*-------------------------------------------------------------------------
*
* aggregate.c
* Commands for distributing AGGREGATE statements.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/metadata/dependency.h"
#include "distributed/metadata_sync.h"
#include "distributed/metadata/distobject.h"
#include "distributed/multi_executor.h"
#include "nodes/parsenodes.h"
#include "utils/lsyscache.h"
/*
* PostprocessDefineAggregateStmt actually creates the plan we need to execute for
* aggregate propagation.
* This is the downside of using the locally created aggregate to get the sql statement.
*
* If the aggregate depends on any non-distributed relation, Citus can not distribute it.
* In order to not to prevent users from creating local aggregates on the coordinator,
* a WARNING message will be sent to the user about the case instead of erroring out.
*
* Besides creating the plan we also make sure all (new) dependencies of the aggregate
* are created on all nodes.
*/
List *
PostprocessDefineAggregateStmt(Node *node, const char *queryString)
{
QualifyTreeNode((Node *) node);
DefineStmt *stmt = castNode(DefineStmt, node);
if (!ShouldPropagate())
{
return NIL;
}
if (!ShouldPropagateCreateInCoordinatedTransction())
{
return NIL;
}
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false);
EnsureCoordinator();
EnsureSequentialMode(OBJECT_AGGREGATE);
ObjectAddress *undistributableDependency = GetUndistributableDependency(
&address);
if (undistributableDependency != NULL)
{
if (SupportedDependencyByCitus(undistributableDependency))
{
/*
* Citus can't distribute some relations as dependency, although those
* types as supported by Citus. So we can use get_rel_name directly
*/
RangeVar *aggRangeVar = makeRangeVarFromNameList(stmt->defnames);
char *aggName = aggRangeVar->relname;
char *dependentRelationName =
get_rel_name(undistributableDependency->objectId);
ereport(WARNING, (errmsg("Citus can't distribute aggregate \"%s\" having "
"dependency on non-distributed relation \"%s\"",
aggName, dependentRelationName),
errdetail("Aggregate will be created only locally"),
errhint("To distribute aggregate, distribute dependent "
"relations first. Then, re-create the aggregate")));
}
else
{
char *objectType = NULL;
#if PG_VERSION_NUM >= PG_VERSION_14
objectType = getObjectTypeDescription(undistributableDependency, false);
#else
objectType = getObjectTypeDescription(undistributableDependency);
#endif
ereport(WARNING, (errmsg("Citus can't distribute functions having "
"dependency on unsupported object of type \"%s\"",
objectType),
errdetail("Aggregate will be created only locally")));
}
return NIL;
}
EnsureDependenciesExistOnAllNodes(&address);
List *commands = CreateFunctionDDLCommandsIdempotent(&address);
commands = lcons(DISABLE_DDL_PROPAGATION, commands);
commands = lappend(commands, ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}

View File

@ -43,11 +43,11 @@ static DistributeObjectOps Aggregate_AlterOwner = {
};
static DistributeObjectOps Aggregate_Define = {
.deparse = NULL,
.qualify = NULL,
.qualify = QualifyDefineAggregateStmt,
.preprocess = NULL,
.postprocess = NULL,
.postprocess = PostprocessDefineAggregateStmt,
.address = DefineAggregateStmtObjectAddress,
.markDistributed = false,
.markDistributed = true,
};
static DistributeObjectOps Aggregate_Drop = {
.deparse = DeparseDropFunctionStmt,

View File

@ -82,7 +82,6 @@ static void EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid
static bool ShouldPropagateCreateFunction(CreateFunctionStmt *stmt);
static bool ShouldPropagateAlterFunction(const ObjectAddress *address);
static bool ShouldAddFunctionSignature(FunctionParameterMode mode);
static ObjectAddress * GetUndistributableDependency(ObjectAddress *functionAddress);
static ObjectAddress FunctionToObjectAddress(ObjectType objectType,
ObjectWithArgs *objectWithArgs,
bool missing_ok);
@ -1352,7 +1351,7 @@ PostprocessCreateFunctionStmt(Node *node, const char *queryString)
* GetUndistributableDependency checks whether object has any non-distributable
* dependency. If any one found, it will be returned.
*/
static ObjectAddress *
ObjectAddress *
GetUndistributableDependency(ObjectAddress *objectAddress)
{
List *dependencies = GetAllDependenciesForObject(objectAddress);
@ -1443,10 +1442,18 @@ DefineAggregateStmtObjectAddress(Node *node, bool missing_ok)
ObjectWithArgs *objectWithArgs = makeNode(ObjectWithArgs);
objectWithArgs->objname = stmt->defnames;
if (stmt->args != NIL)
{
FunctionParameter *funcParam = NULL;
foreach_ptr(funcParam, linitial(stmt->args))
{
objectWithArgs->objargs = lappend(objectWithArgs->objargs, funcParam->argType);
objectWithArgs->objargs = lappend(objectWithArgs->objargs,
funcParam->argType);
}
}
else
{
objectWithArgs->objargs = list_make1(makeTypeName("anyelement"));
}
return FunctionToObjectAddress(OBJECT_AGGREGATE, objectWithArgs, missing_ok);
@ -2019,10 +2026,10 @@ ShouldAddFunctionSignature(FunctionParameterMode mode)
/*
* FunctionToObjectAddress returns the ObjectAddress of a Function or Procedure based on
* its type and ObjectWithArgs describing the Function/Procedure. If missing_ok is set to
* false an error will be raised by postgres explaining the Function/Procedure could not
* be found.
* FunctionToObjectAddress returns the ObjectAddress of a Function, Procedure or
* Aggregate based on its type and ObjectWithArgs describing the
* Function/Procedure/Aggregate. If missing_ok is set to false an error will be
* raised by postgres explaining the Function/Procedure could not be found.
*/
static ObjectAddress
FunctionToObjectAddress(ObjectType objectType, ObjectWithArgs *objectWithArgs,

View File

@ -0,0 +1,34 @@
/*-------------------------------------------------------------------------
*
* qualify_aggregate_stmts.c
* Functions specialized in fully qualifying all aggregate statements.
* These functions are dispatched from qualify.c
*
* Fully qualifying aggregate statements consists of adding the schema name
* to the subject of the types as well as any other branch of the parsetree.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/namespace.h"
#include "distributed/deparser.h"
#include "nodes/makefuncs.h"
#include "utils/lsyscache.h"
void
QualifyDefineAggregateStmt(Node *node)
{
DefineStmt *stmt = castNode(DefineStmt, node);
if (list_length(stmt->defnames) == 1)
{
char *objname = NULL;
Oid creationSchema = QualifiedNameGetCreationNamespace(stmt->defnames, &objname);
stmt->defnames = list_make2(makeString(get_namespace_name(creationSchema)),
linitial(stmt->defnames));
}
}

View File

@ -748,6 +748,11 @@ GetObjectTypeString(ObjectType objType)
{
switch (objType)
{
case OBJECT_AGGREGATE:
{
return "aggregate";
}
case OBJECT_COLLATION:
{
return "collation";

View File

@ -12,15 +12,22 @@ BEGIN
IF substring(current_Setting('server_version'), '\d+')::int >= 14 THEN
EXECUTE $cmd$
-- disable propagation to prevent EnsureCoordinator errors
-- the aggregate created here does not depend on Citus extension (yet)
-- since we add the dependency with the next command
SET citus.enable_ddl_propagation TO OFF;
CREATE AGGREGATE array_cat_agg(anycompatiblearray) (SFUNC = array_cat, STYPE = anycompatiblearray);
COMMENT ON AGGREGATE array_cat_agg(anycompatiblearray)
IS 'concatenate input arrays into a single array';
RESET citus.enable_ddl_propagation;
$cmd$;
ELSE
EXECUTE $cmd$
SET citus.enable_ddl_propagation TO OFF;
CREATE AGGREGATE array_cat_agg(anyarray) (SFUNC = array_cat, STYPE = anyarray);
COMMENT ON AGGREGATE array_cat_agg(anyarray)
IS 'concatenate input arrays into a single array';
RESET citus.enable_ddl_propagation;
$cmd$;
END IF;

View File

@ -12,15 +12,22 @@ BEGIN
IF substring(current_Setting('server_version'), '\d+')::int >= 14 THEN
EXECUTE $cmd$
-- disable propagation to prevent EnsureCoordinator errors
-- the aggregate created here does not depend on Citus extension (yet)
-- since we add the dependency with the next command
SET citus.enable_ddl_propagation TO OFF;
CREATE AGGREGATE array_cat_agg(anycompatiblearray) (SFUNC = array_cat, STYPE = anycompatiblearray);
COMMENT ON AGGREGATE array_cat_agg(anycompatiblearray)
IS 'concatenate input arrays into a single array';
RESET citus.enable_ddl_propagation;
$cmd$;
ELSE
EXECUTE $cmd$
SET citus.enable_ddl_propagation TO OFF;
CREATE AGGREGATE array_cat_agg(anyarray) (SFUNC = array_cat, STYPE = anyarray);
COMMENT ON AGGREGATE array_cat_agg(anyarray)
IS 'concatenate input arrays into a single array';
RESET citus.enable_ddl_propagation;
$cmd$;
END IF;

View File

@ -122,6 +122,9 @@ typedef enum SearchForeignKeyColumnFlags
} SearchForeignKeyColumnFlags;
/* aggregate.c - forward declarations */
extern List * PostprocessDefineAggregateStmt(Node *node, const char *queryString);
/* cluster.c - forward declarations */
extern List * PreprocessClusterStmt(Node *node, const char *clusterCommand,
ProcessUtilityContext processUtilityContext);
@ -264,6 +267,7 @@ extern List * PreprocessCreateFunctionStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PostprocessCreateFunctionStmt(Node *stmt,
const char *queryString);
extern ObjectAddress * GetUndistributableDependency(ObjectAddress *functionAddress);
extern ObjectAddress CreateFunctionStmtObjectAddress(Node *stmt,
bool missing_ok);
extern ObjectAddress DefineAggregateStmtObjectAddress(Node *stmt,

View File

@ -33,6 +33,9 @@ extern void QualifyTreeNode(Node *stmt);
extern char * DeparseTreeNode(Node *stmt);
extern List * DeparseTreeNodes(List *stmts);
/* forward declarations for qualify_aggregate_stmts.c */
extern void QualifyDefineAggregateStmt(Node *node);
/* forward declarations for deparse_attribute_stmts.c */
extern char * DeparseRenameAttributeStmt(Node *);

View File

@ -282,6 +282,21 @@ create aggregate binstragg(text, text)(
combinefunc=binstragg_combinefunc,
stype=text
);
-- verify that the aggregate is added into pg_dist_object, on each worker
SELECT run_command_on_workers($$SELECT count(*) from citus.pg_dist_object where objid = 'aggregate_support.binstragg'::regproc;$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,1)
(localhost,57638,t,1)
(2 rows)
SELECT run_command_on_workers($$select count(*) from pg_aggregate where aggfnoid::text like '%binstragg%';$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,1)
(localhost,57638,t,1)
(2 rows)
select create_distributed_function('binstragg(text,text)');
create_distributed_function
---------------------------------------------------------------------
@ -549,7 +564,6 @@ SELECT id%5, first(val ORDER BY key), last(val ORDER BY key)
FROM aggdata GROUP BY id%5 ORDER BY id%5;
ERROR: unsupported aggregate function first
-- test aggregate with stype which is not a by-value datum
-- also test our handling of the aggregate not existing on workers
create function sumstring_sfunc(state text, x text)
returns text immutable language plpgsql as $$
begin return (state::float8 + x::float8)::text;
@ -561,9 +575,20 @@ create aggregate sumstring(text) (
combinefunc = sumstring_sfunc,
initcond = '0'
);
select sumstring(valf::text) from aggdata where valf is not null;
ERROR: function "aggregate_support.sumstring(text)" does not exist
CONTEXT: while executing command on localhost:xxxxx
-- verify that the aggregate is propagated
select aggfnoid from pg_aggregate where aggfnoid::text like '%sumstring%';
aggfnoid
---------------------------------------------------------------------
sumstring
(1 row)
SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%sumstring%';$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,aggregate_support.sumstring)
(localhost,57638,t,aggregate_support.sumstring)
(2 rows)
select create_distributed_function('sumstring(text)');
create_distributed_function
---------------------------------------------------------------------
@ -1042,5 +1067,122 @@ LEFT JOIN ref_table ON TRUE;
109
(1 row)
-- try createing aggregate having non-distributable dependency type
create table dummy_tbl (a int);
create function dummy_fnc(a dummy_tbl, d double precision) RETURNS dummy_tbl
AS $$SELECT 1;$$ LANGUAGE sql;
WARNING: Citus can't distribute function "dummy_fnc" having dependency on non-distributed relation "dummy_tbl"
DETAIL: Function will be created only locally
HINT: To distribute function, distribute dependent relations first. Then, re-create the function
-- should give warning and create aggregate local only
create aggregate dependent_agg (float8) (stype=dummy_tbl, sfunc=dummy_fnc);
WARNING: Citus can't distribute aggregate "dependent_agg" having dependency on non-distributed relation "dummy_tbl"
DETAIL: Aggregate will be created only locally
HINT: To distribute aggregate, distribute dependent relations first. Then, re-create the aggregate
-- clear and try again with distributed table
DROP TABLE dummy_tbl CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to function dummy_fnc(dummy_tbl,double precision)
drop cascades to function dependent_agg(double precision)
create table dummy_tbl (a int);
SELECT create_distributed_table('dummy_tbl','a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
create function dummy_fnc(a dummy_tbl, d double precision) RETURNS dummy_tbl
AS $$SELECT 1;$$ LANGUAGE sql;
-- test in tx block
-- shouldn't distribute, as citus.create_object_propagation is set to deferred
BEGIN;
create aggregate dependent_agg (float8) (stype=dummy_tbl, sfunc=dummy_fnc);
COMMIT;
-- verify not distributed
SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%dependent_agg%';$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"")
(localhost,57638,t,"")
(2 rows)
drop aggregate dependent_agg ( double precision);
-- now try with create_object_propagation = immediate
SET citus.create_object_propagation TO immediate;
-- should distribute, as citus.create_object_propagation is set to immediate
-- will switch to sequential mode
BEGIN;
create aggregate dependent_agg (float8) (stype=dummy_tbl, sfunc=dummy_fnc);
COMMIT;
-- verify distributed
SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%dependent_agg%';$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,aggregate_support.dependent_agg)
(localhost,57638,t,aggregate_support.dependent_agg)
(2 rows)
drop aggregate dependent_agg ( double precision);
-- now try with create_object_propagation = automatic
SET citus.create_object_propagation TO automatic;
-- should distribute, as citus.create_object_propagation is set to automatic
-- will switch to sequential mode
BEGIN;
create aggregate dependent_agg (float8) (stype=dummy_tbl, sfunc=dummy_fnc);
COMMIT;
-- verify distributed
SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%dependent_agg%';$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,aggregate_support.dependent_agg)
(localhost,57638,t,aggregate_support.dependent_agg)
(2 rows)
-- verify that the aggregate is added into pg_dist_object, on each worker
SELECT run_command_on_workers($$SELECT count(*) from citus.pg_dist_object where objid = 'aggregate_support.dependent_agg'::regproc;$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,1)
(localhost,57638,t,1)
(2 rows)
RESET citus.create_object_propagation;
-- drop and test outside of tx block
drop aggregate dependent_agg (float8);
-- verify that the aggregate is removed from pg_dist_object, on each worker
SELECT run_command_on_workers($$SELECT count(*) from citus.pg_dist_object where objid = 'aggregate_support.dependent_agg'::regproc;$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,f,"ERROR: function ""aggregate_support.dependent_agg"" does not exist")
(localhost,57638,f,"ERROR: function ""aggregate_support.dependent_agg"" does not exist")
(2 rows)
create aggregate dependent_agg (float8) (stype=dummy_tbl, sfunc=dummy_fnc);
--verify
SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%dependent_agg%';$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,aggregate_support.dependent_agg)
(localhost,57638,t,aggregate_support.dependent_agg)
(2 rows)
DROP TABLE dummy_tbl CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to function dummy_fnc(dummy_tbl,double precision)
drop cascades to function dependent_agg(double precision)
SET citus.create_object_propagation TO automatic;
begin;
create type typ1 as (a int);
create or replace function fnagg(a typ1, d double precision) RETURNS typ1 AS $$SELECT 1;$$LANGUAGE sql;
create aggregate dependent_agg (float8) (stype=typ1, sfunc=fnagg);
commit;
RESET citus.create_object_propagation;
SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%dependent_agg%';$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,aggregate_support.dependent_agg)
(localhost,57638,t,aggregate_support.dependent_agg)
(2 rows)
set client_min_messages to error;
drop schema aggregate_support cascade;

View File

@ -7,6 +7,15 @@ SELECT citus_remove_node('localhost', :worker_1_port);
(1 row)
-- not related, but added here to test propagation of aggregates
-- to newly added nodes
CREATE AGGREGATE array_agg (anynonarray)
(
sfunc = array_agg_transfn,
stype = internal,
finalfunc = array_agg_finalfn,
finalfunc_extra
);
-- create schema, extension and foreign server while the worker is removed
SET citus.enable_ddl_propagation TO OFF;
CREATE SCHEMA test_dependent_schema;
@ -28,6 +37,20 @@ NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipp
1
(1 row)
-- verify that the aggregate is propagated to the new node
SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%propagate_foreign_server.array_agg%';$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57638,t,propagate_foreign_server.array_agg)
(1 row)
-- verify that the aggregate is added top pg_dist_object on the new node
SELECT run_command_on_workers($$SELECT count(*) from citus.pg_dist_object where objid = 'propagate_foreign_server.array_agg'::regproc;$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57638,t,1)
(1 row)
SELECT citus_add_local_table_to_metadata('foreign_table');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
@ -163,4 +186,6 @@ ERROR: operation is not allowed on this node
HINT: Connect to the coordinator and run it again.
\c - - - :master_port
DROP SCHEMA propagate_foreign_server CASCADE;
NOTICE: drop cascades to extension postgres_fdw
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to function propagate_foreign_server.array_agg(anynonarray)
drop cascades to extension postgres_fdw

View File

@ -160,6 +160,10 @@ create aggregate binstragg(text, text)(
combinefunc=binstragg_combinefunc,
stype=text
);
-- verify that the aggregate is added into pg_dist_object, on each worker
SELECT run_command_on_workers($$SELECT count(*) from citus.pg_dist_object where objid = 'aggregate_support.binstragg'::regproc;$$);
SELECT run_command_on_workers($$select count(*) from pg_aggregate where aggfnoid::text like '%binstragg%';$$);
select create_distributed_function('binstragg(text,text)');
@ -313,7 +317,6 @@ SELECT id%5, first(val ORDER BY key), last(val ORDER BY key)
FROM aggdata GROUP BY id%5 ORDER BY id%5;
-- test aggregate with stype which is not a by-value datum
-- also test our handling of the aggregate not existing on workers
create function sumstring_sfunc(state text, x text)
returns text immutable language plpgsql as $$
begin return (state::float8 + x::float8)::text;
@ -326,8 +329,10 @@ create aggregate sumstring(text) (
combinefunc = sumstring_sfunc,
initcond = '0'
);
-- verify that the aggregate is propagated
select aggfnoid from pg_aggregate where aggfnoid::text like '%sumstring%';
SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%sumstring%';$$);
select sumstring(valf::text) from aggdata where valf is not null;
select create_distributed_function('sumstring(text)');
select sumstring(valf::text) from aggdata where valf is not null;
@ -533,5 +538,79 @@ SELECT floor(AVG(COALESCE(agg_col, 10)))
FROM dist_table
LEFT JOIN ref_table ON TRUE;
-- try createing aggregate having non-distributable dependency type
create table dummy_tbl (a int);
create function dummy_fnc(a dummy_tbl, d double precision) RETURNS dummy_tbl
AS $$SELECT 1;$$ LANGUAGE sql;
-- should give warning and create aggregate local only
create aggregate dependent_agg (float8) (stype=dummy_tbl, sfunc=dummy_fnc);
-- clear and try again with distributed table
DROP TABLE dummy_tbl CASCADE;
create table dummy_tbl (a int);
SELECT create_distributed_table('dummy_tbl','a');
create function dummy_fnc(a dummy_tbl, d double precision) RETURNS dummy_tbl
AS $$SELECT 1;$$ LANGUAGE sql;
-- test in tx block
-- shouldn't distribute, as citus.create_object_propagation is set to deferred
BEGIN;
create aggregate dependent_agg (float8) (stype=dummy_tbl, sfunc=dummy_fnc);
COMMIT;
-- verify not distributed
SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%dependent_agg%';$$);
drop aggregate dependent_agg ( double precision);
-- now try with create_object_propagation = immediate
SET citus.create_object_propagation TO immediate;
-- should distribute, as citus.create_object_propagation is set to immediate
-- will switch to sequential mode
BEGIN;
create aggregate dependent_agg (float8) (stype=dummy_tbl, sfunc=dummy_fnc);
COMMIT;
-- verify distributed
SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%dependent_agg%';$$);
drop aggregate dependent_agg ( double precision);
-- now try with create_object_propagation = automatic
SET citus.create_object_propagation TO automatic;
-- should distribute, as citus.create_object_propagation is set to automatic
-- will switch to sequential mode
BEGIN;
create aggregate dependent_agg (float8) (stype=dummy_tbl, sfunc=dummy_fnc);
COMMIT;
-- verify distributed
SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%dependent_agg%';$$);
-- verify that the aggregate is added into pg_dist_object, on each worker
SELECT run_command_on_workers($$SELECT count(*) from citus.pg_dist_object where objid = 'aggregate_support.dependent_agg'::regproc;$$);
RESET citus.create_object_propagation;
-- drop and test outside of tx block
drop aggregate dependent_agg (float8);
-- verify that the aggregate is removed from pg_dist_object, on each worker
SELECT run_command_on_workers($$SELECT count(*) from citus.pg_dist_object where objid = 'aggregate_support.dependent_agg'::regproc;$$);
create aggregate dependent_agg (float8) (stype=dummy_tbl, sfunc=dummy_fnc);
--verify
SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%dependent_agg%';$$);
DROP TABLE dummy_tbl CASCADE;
SET citus.create_object_propagation TO automatic;
begin;
create type typ1 as (a int);
create or replace function fnagg(a typ1, d double precision) RETURNS typ1 AS $$SELECT 1;$$LANGUAGE sql;
create aggregate dependent_agg (float8) (stype=typ1, sfunc=fnagg);
commit;
RESET citus.create_object_propagation;
SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%dependent_agg%';$$);
set client_min_messages to error;
drop schema aggregate_support cascade;

View File

@ -4,6 +4,16 @@ SET search_path TO propagate_foreign_server;
-- remove node to add later
SELECT citus_remove_node('localhost', :worker_1_port);
-- not related, but added here to test propagation of aggregates
-- to newly added nodes
CREATE AGGREGATE array_agg (anynonarray)
(
sfunc = array_agg_transfn,
stype = internal,
finalfunc = array_agg_finalfn,
finalfunc_extra
);
-- create schema, extension and foreign server while the worker is removed
SET citus.enable_ddl_propagation TO OFF;
CREATE SCHEMA test_dependent_schema;
@ -20,6 +30,13 @@ CREATE FOREIGN TABLE foreign_table (
OPTIONS (schema_name 'test_dependent_schema', table_name 'foreign_table_test');
SELECT 1 FROM citus_add_node('localhost', :master_port, groupId=>0);
-- verify that the aggregate is propagated to the new node
SELECT run_command_on_workers($$select aggfnoid from pg_aggregate where aggfnoid::text like '%propagate_foreign_server.array_agg%';$$);
-- verify that the aggregate is added top pg_dist_object on the new node
SELECT run_command_on_workers($$SELECT count(*) from citus.pg_dist_object where objid = 'propagate_foreign_server.array_agg'::regproc;$$);
SELECT citus_add_local_table_to_metadata('foreign_table');
ALTER TABLE foreign_table OWNER TO pg_monitor;