Add the necessary changes for rebalance strategies on enterprise (#3325)

This commit adds the SQL and C changes necessary to support custom rebalance
strategies in the Enterprise version of Citus.
pull/3327/head
Jelte Fennema 2019-12-19 15:23:08 +01:00 committed by GitHub
parent c9ceff7d78
commit b655c02352
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 928 additions and 30 deletions

View File

@ -71,9 +71,6 @@ static uint64 DistributedTableSize(Oid relationId, char *sizeQuery);
static uint64 DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
char *sizeQuery);
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
static StringInfo GenerateSizeQueryOnMultiplePlacements(Oid distributedRelationId,
List *shardIntervalList,
char *sizeQuery);
static void ErrorIfNotSuitableToGetSize(Oid relationId);
@ -212,7 +209,7 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, char *sizeQ
List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode, relationId);
StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(relationId,
StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(
shardIntervalsOnNode,
sizeQuery);
@ -328,19 +325,14 @@ ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId)
/*
* GenerateSizeQueryOnMultiplePlacements generates a select size query to get
* size of multiple tables from the relation with distributedRelationId. Note
* that, different size functions supported by PG are also supported by this
* function changing the size query given as the last parameter to function.
* Format of sizeQuery is pg_*_size(%s). Examples of it can be found in the
* master_protocol.h
* size of multiple tables. Note that, different size functions supported by PG
* are also supported by this function changing the size query given as the
* last parameter to function. Format of sizeQuery is pg_*_size(%s). Examples
* of it can be found in the master_protocol.h
*/
static StringInfo
GenerateSizeQueryOnMultiplePlacements(Oid distributedRelationId, List *shardIntervalList,
char *sizeQuery)
StringInfo
GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, char *sizeQuery)
{
Oid schemaId = get_rel_namespace(distributedRelationId);
char *schemaName = get_namespace_name(schemaId);
StringInfo selectQuery = makeStringInfo();
ListCell *shardIntervalCell = NULL;
@ -350,7 +342,9 @@ GenerateSizeQueryOnMultiplePlacements(Oid distributedRelationId, List *shardInte
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId;
char *shardName = get_rel_name(distributedRelationId);
Oid schemaId = get_rel_namespace(shardInterval->relationId);
char *schemaName = get_namespace_name(schemaId);
char *shardName = get_rel_name(shardInterval->relationId);
AppendShardIdToName(&shardName, shardId);
char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName);

View File

@ -11,10 +11,194 @@
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/htup_details.h"
#include "catalog/pg_type.h"
#include "catalog/pg_proc.h"
#include "distributed/enterprise.h"
#include "utils/syscache.h"
static void EnsureShardCostUDF(Oid functionOid);
static void EnsureNodeCapacityUDF(Oid functionOid);
static void EnsureShardAllowedOnNodeUDF(Oid functionOid);
NOT_SUPPORTED_IN_COMMUNITY(rebalance_table_shards);
NOT_SUPPORTED_IN_COMMUNITY(replicate_table_shards);
NOT_SUPPORTED_IN_COMMUNITY(get_rebalance_table_shards_plan);
NOT_SUPPORTED_IN_COMMUNITY(get_rebalance_progress);
NOT_SUPPORTED_IN_COMMUNITY(master_drain_node);
NOT_SUPPORTED_IN_COMMUNITY(citus_shard_cost_by_disk_size);
PG_FUNCTION_INFO_V1(pg_dist_rebalance_strategy_enterprise_check);
PG_FUNCTION_INFO_V1(citus_validate_rebalance_strategy_functions);
/*
* citus_rebalance_strategy_enterprise_check is trigger function, intended for
* use in prohibiting writes to pg_dist_rebalance_strategy in Citus Community.
*/
Datum
pg_dist_rebalance_strategy_enterprise_check(PG_FUNCTION_ARGS)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot write to pg_dist_rebalance_strategy"),
errdetail(
"Citus Community Edition does not support the use of "
"custom rebalance strategies."),
errhint(
"To learn more about using advanced rebalancing schemes "
"with Citus, please contact us at "
"https://citusdata.com/about/contact_us")));
}
/*
* citus_validate_rebalance_strategy_functions checks all the functions for
* their correct signature.
*
* SQL signature:
*
* citus_validate_rebalance_strategy_functions(
* shard_cost_function regproc,
* node_capacity_function regproc,
* shard_allowed_on_node_function regproc,
* ) RETURNS VOID
*/
Datum
citus_validate_rebalance_strategy_functions(PG_FUNCTION_ARGS)
{
EnsureShardCostUDF(PG_GETARG_OID(0));
EnsureNodeCapacityUDF(PG_GETARG_OID(1));
EnsureShardAllowedOnNodeUDF(PG_GETARG_OID(2));
PG_RETURN_VOID();
}
/*
* EnsureShardCostUDF checks that the UDF matching the oid has the correct
* signature to be used as a ShardCost function. The expected signature is:
*
* shard_cost(shardid bigint) returns float4
*/
static void
EnsureShardCostUDF(Oid functionOid)
{
HeapTuple proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(functionOid));
if (!HeapTupleIsValid(proctup))
{
ereport(ERROR, (errmsg("cache lookup failed for shard_cost_function with oid %u",
functionOid)));
}
Form_pg_proc procForm = (Form_pg_proc) GETSTRUCT(proctup);
char *name = NameStr(procForm->proname);
if (procForm->pronargs != 1)
{
ereport(ERROR, (errmsg("signature for shard_cost_function is incorrect"),
errdetail(
"number of arguments of %s should be 1, not %i",
name, procForm->pronargs)));
}
if (procForm->proargtypes.values[0] != INT8OID)
{
ereport(ERROR, (errmsg("signature for shard_cost_function is incorrect"),
errdetail(
"argument type of %s should be bigint", name)));
}
if (procForm->prorettype != FLOAT4OID)
{
ereport(ERROR, (errmsg("signature for shard_cost_function is incorrect"),
errdetail("return type of %s should be real", name)));
}
ReleaseSysCache(proctup);
}
/*
* EnsureNodeCapacityUDF checks that the UDF matching the oid has the correct
* signature to be used as a NodeCapacity function. The expected signature is:
*
* node_capacity(nodeid int) returns float4
*/
static void
EnsureNodeCapacityUDF(Oid functionOid)
{
HeapTuple proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(functionOid));
if (!HeapTupleIsValid(proctup))
{
ereport(ERROR, (errmsg(
"cache lookup failed for node_capacity_function with oid %u",
functionOid)));
}
Form_pg_proc procForm = (Form_pg_proc) GETSTRUCT(proctup);
char *name = NameStr(procForm->proname);
if (procForm->pronargs != 1)
{
ereport(ERROR, (errmsg("signature for node_capacity_function is incorrect"),
errdetail(
"number of arguments of %s should be 1, not %i",
name, procForm->pronargs)));
}
if (procForm->proargtypes.values[0] != INT4OID)
{
ereport(ERROR, (errmsg("signature for node_capacity_function is incorrect"),
errdetail("argument type of %s should be int", name)));
}
if (procForm->prorettype != FLOAT4OID)
{
ereport(ERROR, (errmsg("signature for node_capacity_function is incorrect"),
errdetail("return type of %s should be real", name)));
}
ReleaseSysCache(proctup);
}
/*
* EnsureNodeCapacityUDF checks that the UDF matching the oid has the correct
* signature to be used as a NodeCapacity function. The expected signature is:
*
* shard_allowed_on_node(shardid bigint, nodeid int) returns boolean
*/
static void
EnsureShardAllowedOnNodeUDF(Oid functionOid)
{
HeapTuple proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(functionOid));
if (!HeapTupleIsValid(proctup))
{
ereport(ERROR, (errmsg(
"cache lookup failed for shard_allowed_on_node_function with oid %u",
functionOid)));
}
Form_pg_proc procForm = (Form_pg_proc) GETSTRUCT(proctup);
char *name = NameStr(procForm->proname);
if (procForm->pronargs != 2)
{
ereport(ERROR, (errmsg(
"signature for shard_allowed_on_node_function is incorrect"),
errdetail(
"number of arguments of %s should be 2, not %i",
name, procForm->pronargs)));
}
if (procForm->proargtypes.values[0] != INT8OID)
{
ereport(ERROR, (errmsg(
"signature for shard_allowed_on_node_function is incorrect"),
errdetail(
"type of first argument of %s should be bigint", name)));
}
if (procForm->proargtypes.values[1] != INT4OID)
{
ereport(ERROR, (errmsg(
"signature for shard_allowed_on_node_function is incorrect"),
errdetail(
"type of second argument of %s should be int", name)));
}
if (procForm->prorettype != BOOLOID)
{
ereport(ERROR, (errmsg(
"signature for shard_allowed_on_node_function is incorrect"),
errdetail(
"return type of %s should be boolean", name)));
}
ReleaseSysCache(proctup);
}

View File

@ -114,6 +114,7 @@ typedef struct MetadataCacheData
bool extensionLoaded;
Oid distShardRelationId;
Oid distPlacementRelationId;
Oid distRebalanceStrategyRelationId;
Oid distNodeRelationId;
Oid distNodeNodeIdIndexId;
Oid distLocalGroupRelationId;
@ -1860,6 +1861,17 @@ DistLocalGroupIdRelationId(void)
}
/* return oid of pg_dist_rebalance_strategy relation */
Oid
DistRebalanceStrategyRelationId(void)
{
CachedRelationLookup("pg_dist_rebalance_strategy",
&MetadataCache.distRebalanceStrategyRelationId);
return MetadataCache.distRebalanceStrategyRelationId;
}
/* return the oid of citus namespace */
Oid
CitusCatalogNamespaceId(void)

View File

@ -12,3 +12,73 @@ DROP INDEX pg_dist_colocation_configuration_index;
CREATE INDEX pg_dist_colocation_configuration_index
ON pg_dist_colocation USING btree(distributioncolumntype, shardcount, replicationfactor, distributioncolumncollation);
CREATE TABLE citus.pg_dist_rebalance_strategy(
name name NOT NULL,
default_strategy boolean NOT NULL DEFAULT false,
shard_cost_function regproc NOT NULL,
node_capacity_function regproc NOT NULL,
shard_allowed_on_node_function regproc NOT NULL,
default_threshold float4 NOT NULL,
minimum_threshold float4 NOT NULL DEFAULT 0,
UNIQUE(name)
);
ALTER TABLE citus.pg_dist_rebalance_strategy SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.pg_dist_rebalance_strategy TO public;
#include "udfs/citus_validate_rebalance_strategy_functions/9.2-1.sql"
#include "udfs/pg_dist_rebalance_strategy_trigger_func/9.2-1.sql"
CREATE TRIGGER pg_dist_rebalance_strategy_validation_trigger
BEFORE INSERT OR UPDATE ON pg_dist_rebalance_strategy
FOR EACH ROW EXECUTE PROCEDURE citus_internal.pg_dist_rebalance_strategy_trigger_func();
#include "udfs/citus_add_rebalance_strategy/9.2-1.sql"
#include "udfs/citus_set_default_rebalance_strategy/9.2-1.sql"
#include "udfs/citus_shard_cost_1/9.2-1.sql"
#include "udfs/citus_shard_cost_by_disk_size/9.2-1.sql"
#include "udfs/citus_node_capacity_1/9.2-1.sql"
#include "udfs/citus_shard_allowed_on_node_true/9.2-1.sql"
INSERT INTO
pg_catalog.pg_dist_rebalance_strategy(
name,
default_strategy,
shard_cost_function,
node_capacity_function,
shard_allowed_on_node_function,
default_threshold,
minimum_threshold
) VALUES (
'by_shard_count',
true,
'citus_shard_cost_1',
'citus_node_capacity_1',
'citus_shard_allowed_on_node_true',
0,
0
), (
'by_disk_size',
false,
'citus_shard_cost_by_disk_size',
'citus_node_capacity_1',
'citus_shard_allowed_on_node_true',
0.1,
0.01
);
CREATE FUNCTION citus_internal.pg_dist_rebalance_strategy_enterprise_check()
RETURNS TRIGGER
LANGUAGE C
AS 'MODULE_PATHNAME';
CREATE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger
BEFORE INSERT OR UPDATE OR DELETE OR TRUNCATE ON pg_dist_rebalance_strategy
FOR EACH STATEMENT EXECUTE FUNCTION citus_internal.pg_dist_rebalance_strategy_enterprise_check();
#include "udfs/master_drain_node/9.2-1.sql"
#include "udfs/rebalance_table_shards/9.2-1.sql"
#include "udfs/get_rebalance_table_shards_plan/9.2-1.sql"
#include "udfs/citus_prepare_pg_upgrade/9.2-1.sql"
#include "udfs/citus_finish_pg_upgrade/9.2-1.sql"

View File

@ -0,0 +1,28 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_add_rebalance_strategy(
name name,
shard_cost_function regproc,
node_capacity_function regproc,
shard_allowed_on_node_function regproc,
default_threshold float4,
minimum_threshold float4 DEFAULT 0
)
RETURNS VOID AS $$
INSERT INTO
pg_catalog.pg_dist_rebalance_strategy(
name,
shard_cost_function,
node_capacity_function,
shard_allowed_on_node_function,
default_threshold,
minimum_threshold
) VALUES (
name,
shard_cost_function,
node_capacity_function,
shard_allowed_on_node_function,
default_threshold,
minimum_threshold
);
$$ LANGUAGE sql;
COMMENT ON FUNCTION pg_catalog.citus_add_rebalance_strategy(name,regproc,regproc,regproc,float4, float4)
IS 'adds a new rebalance strategy which can be used when rebalancing shards or draining nodes';

View File

@ -0,0 +1,28 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_add_rebalance_strategy(
name name,
shard_cost_function regproc,
node_capacity_function regproc,
shard_allowed_on_node_function regproc,
default_threshold float4,
minimum_threshold float4 DEFAULT 0
)
RETURNS VOID AS $$
INSERT INTO
pg_catalog.pg_dist_rebalance_strategy(
name,
shard_cost_function,
node_capacity_function,
shard_allowed_on_node_function,
default_threshold,
minimum_threshold
) VALUES (
name,
shard_cost_function,
node_capacity_function,
shard_allowed_on_node_function,
default_threshold,
minimum_threshold
);
$$ LANGUAGE sql;
COMMENT ON FUNCTION pg_catalog.citus_add_rebalance_strategy(name,regproc,regproc,regproc,float4, float4)
IS 'adds a new rebalance strategy which can be used when rebalancing shards or draining nodes';

View File

@ -0,0 +1,113 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_finish_pg_upgrade()
RETURNS void
LANGUAGE plpgsql
SET search_path = pg_catalog
AS $cppu$
DECLARE
table_name regclass;
command text;
trigger_name text;
BEGIN
--
-- restore citus catalog tables
--
INSERT INTO pg_catalog.pg_dist_partition SELECT * FROM public.pg_dist_partition;
INSERT INTO pg_catalog.pg_dist_shard SELECT * FROM public.pg_dist_shard;
INSERT INTO pg_catalog.pg_dist_placement SELECT * FROM public.pg_dist_placement;
INSERT INTO pg_catalog.pg_dist_node_metadata SELECT * FROM public.pg_dist_node_metadata;
INSERT INTO pg_catalog.pg_dist_node SELECT * FROM public.pg_dist_node;
INSERT INTO pg_catalog.pg_dist_local_group SELECT * FROM public.pg_dist_local_group;
INSERT INTO pg_catalog.pg_dist_transaction SELECT * FROM public.pg_dist_transaction;
INSERT INTO pg_catalog.pg_dist_colocation SELECT * FROM public.pg_dist_colocation;
-- enterprise catalog tables
INSERT INTO pg_catalog.pg_dist_authinfo SELECT * FROM public.pg_dist_authinfo;
INSERT INTO pg_catalog.pg_dist_poolinfo SELECT * FROM public.pg_dist_poolinfo;
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DISABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger;
INSERT INTO pg_catalog.pg_dist_rebalance_strategy SELECT
name,
default_strategy,
shard_cost_function::regprocedure::regproc,
node_capacity_function::regprocedure::regproc,
shard_allowed_on_node_function::regprocedure::regproc,
default_threshold,
minimum_threshold
FROM public.pg_dist_rebalance_strategy;
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger;
--
-- drop backup tables
--
DROP TABLE public.pg_dist_authinfo;
DROP TABLE public.pg_dist_colocation;
DROP TABLE public.pg_dist_local_group;
DROP TABLE public.pg_dist_node;
DROP TABLE public.pg_dist_node_metadata;
DROP TABLE public.pg_dist_partition;
DROP TABLE public.pg_dist_placement;
DROP TABLE public.pg_dist_poolinfo;
DROP TABLE public.pg_dist_shard;
DROP TABLE public.pg_dist_transaction;
--
-- reset sequences
--
PERFORM setval('pg_catalog.pg_dist_shardid_seq', (SELECT MAX(shardid)+1 AS max_shard_id FROM pg_dist_shard), false);
PERFORM setval('pg_catalog.pg_dist_placement_placementid_seq', (SELECT MAX(placementid)+1 AS max_placement_id FROM pg_dist_placement), false);
PERFORM setval('pg_catalog.pg_dist_groupid_seq', (SELECT MAX(groupid)+1 AS max_group_id FROM pg_dist_node), false);
PERFORM setval('pg_catalog.pg_dist_node_nodeid_seq', (SELECT MAX(nodeid)+1 AS max_node_id FROM pg_dist_node), false);
PERFORM setval('pg_catalog.pg_dist_colocationid_seq', (SELECT MAX(colocationid)+1 AS max_colocation_id FROM pg_dist_colocation), false);
--
-- register triggers
--
FOR table_name IN SELECT logicalrelid FROM pg_catalog.pg_dist_partition
LOOP
trigger_name := 'truncate_trigger_' || table_name::oid;
command := 'create trigger ' || trigger_name || ' after truncate on ' || table_name || ' execute procedure pg_catalog.citus_truncate_trigger()';
EXECUTE command;
command := 'update pg_trigger set tgisinternal = true where tgname = ' || quote_literal(trigger_name);
EXECUTE command;
END LOOP;
--
-- set dependencies
--
INSERT INTO pg_depend
SELECT
'pg_class'::regclass::oid as classid,
p.logicalrelid::regclass::oid as objid,
0 as objsubid,
'pg_extension'::regclass::oid as refclassid,
(select oid from pg_extension where extname = 'citus') as refobjid,
0 as refobjsubid ,
'n' as deptype
FROM pg_catalog.pg_dist_partition p;
-- restore pg_dist_object from the stable identifiers
-- DELETE/INSERT to avoid primary key violations
WITH old_records AS (
DELETE FROM
citus.pg_dist_object
RETURNING
type,
object_names,
object_args,
distribution_argument_index,
colocationid
)
INSERT INTO citus.pg_dist_object (classid, objid, objsubid, distribution_argument_index, colocationid)
SELECT
address.classid,
address.objid,
address.objsubid,
naming.distribution_argument_index,
naming.colocationid
FROM
old_records naming,
pg_get_object_address(naming.type, naming.object_names, naming.object_args) address;
END;
$cppu$;
COMMENT ON FUNCTION pg_catalog.citus_finish_pg_upgrade()
IS 'perform tasks to restore citus settings from a location that has been prepared before pg_upgrade';

View File

@ -23,6 +23,18 @@ BEGIN
INSERT INTO pg_catalog.pg_dist_authinfo SELECT * FROM public.pg_dist_authinfo;
INSERT INTO pg_catalog.pg_dist_poolinfo SELECT * FROM public.pg_dist_poolinfo;
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DISABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger;
INSERT INTO pg_catalog.pg_dist_rebalance_strategy SELECT
name,
default_strategy,
shard_cost_function::regprocedure::regproc,
node_capacity_function::regprocedure::regproc,
shard_allowed_on_node_function::regprocedure::regproc,
default_threshold,
minimum_threshold
FROM public.pg_dist_rebalance_strategy;
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger;
--
-- drop backup tables
--

View File

@ -0,0 +1,4 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_node_capacity_1(int)
RETURNS float4 AS $$ SELECT 1.0::float4 $$ LANGUAGE sql;
COMMENT ON FUNCTION pg_catalog.citus_node_capacity_1(int)
IS 'a node capacity function for use by the rebalance algorithm that always returns 1';

View File

@ -0,0 +1,4 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_node_capacity_1(int)
RETURNS float4 AS $$ SELECT 1.0::float4 $$ LANGUAGE sql;
COMMENT ON FUNCTION pg_catalog.citus_node_capacity_1(int)
IS 'a node capacity function for use by the rebalance algorithm that always returns 1';

View File

@ -0,0 +1,38 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_prepare_pg_upgrade()
RETURNS void
LANGUAGE plpgsql
SET search_path = pg_catalog
AS $cppu$
BEGIN
--
-- backup citus catalog tables
--
CREATE TABLE public.pg_dist_partition AS SELECT * FROM pg_catalog.pg_dist_partition;
CREATE TABLE public.pg_dist_shard AS SELECT * FROM pg_catalog.pg_dist_shard;
CREATE TABLE public.pg_dist_placement AS SELECT * FROM pg_catalog.pg_dist_placement;
CREATE TABLE public.pg_dist_node_metadata AS SELECT * FROM pg_catalog.pg_dist_node_metadata;
CREATE TABLE public.pg_dist_node AS SELECT * FROM pg_catalog.pg_dist_node;
CREATE TABLE public.pg_dist_local_group AS SELECT * FROM pg_catalog.pg_dist_local_group;
CREATE TABLE public.pg_dist_transaction AS SELECT * FROM pg_catalog.pg_dist_transaction;
CREATE TABLE public.pg_dist_colocation AS SELECT * FROM pg_catalog.pg_dist_colocation;
-- enterprise catalog tables
CREATE TABLE public.pg_dist_authinfo AS SELECT * FROM pg_catalog.pg_dist_authinfo;
CREATE TABLE public.pg_dist_poolinfo AS SELECT * FROM pg_catalog.pg_dist_poolinfo;
CREATE TABLE public.pg_dist_rebalance_strategy AS SELECT
name,
default_strategy,
shard_cost_function::regprocedure::text,
node_capacity_function::regprocedure::text,
shard_allowed_on_node_function::regprocedure::text,
default_threshold,
minimum_threshold
FROM pg_catalog.pg_dist_rebalance_strategy;
-- store upgrade stable identifiers on pg_dist_object catalog
UPDATE citus.pg_dist_object
SET (type, object_names, object_args) = (SELECT * FROM pg_identify_object_as_address(classid, objid, objsubid));
END;
$cppu$;
COMMENT ON FUNCTION pg_catalog.citus_prepare_pg_upgrade()
IS 'perform tasks to copy citus settings to a location that could later be restored after pg_upgrade is done';

View File

@ -18,6 +18,15 @@ BEGIN
-- enterprise catalog tables
CREATE TABLE public.pg_dist_authinfo AS SELECT * FROM pg_catalog.pg_dist_authinfo;
CREATE TABLE public.pg_dist_poolinfo AS SELECT * FROM pg_catalog.pg_dist_poolinfo;
CREATE TABLE public.pg_dist_rebalance_strategy AS SELECT
name,
default_strategy,
shard_cost_function::regprocedure::text,
node_capacity_function::regprocedure::text,
shard_allowed_on_node_function::regprocedure::text,
default_threshold,
minimum_threshold
FROM pg_catalog.pg_dist_rebalance_strategy;
-- store upgrade stable identifiers on pg_dist_object catalog
UPDATE citus.pg_dist_object

View File

@ -0,0 +1,18 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_set_default_rebalance_strategy(
name text
)
RETURNS VOID
STRICT
AS $$
BEGIN
LOCK TABLE pg_dist_rebalance_strategy IN SHARE ROW EXCLUSIVE MODE;
IF NOT EXISTS (SELECT 1 FROM pg_dist_rebalance_strategy t WHERE t.name = $1) THEN
RAISE EXCEPTION 'strategy with specified name does not exist';
END IF;
UPDATE pg_dist_rebalance_strategy SET default_strategy = false WHERE default_strategy = true;
UPDATE pg_dist_rebalance_strategy t SET default_strategy = true WHERE t.name = $1;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION pg_catalog.citus_set_default_rebalance_strategy(text)
IS 'changes the default rebalance strategy to the one with the specified name';

View File

@ -0,0 +1,18 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_set_default_rebalance_strategy(
name text
)
RETURNS VOID
STRICT
AS $$
BEGIN
LOCK TABLE pg_dist_rebalance_strategy IN SHARE ROW EXCLUSIVE MODE;
IF NOT EXISTS (SELECT 1 FROM pg_dist_rebalance_strategy t WHERE t.name = $1) THEN
RAISE EXCEPTION 'strategy with specified name does not exist';
END IF;
UPDATE pg_dist_rebalance_strategy SET default_strategy = false WHERE default_strategy = true;
UPDATE pg_dist_rebalance_strategy t SET default_strategy = true WHERE t.name = $1;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION pg_catalog.citus_set_default_rebalance_strategy(text)
IS 'changes the default rebalance strategy to the one with the specified name';

View File

@ -0,0 +1,5 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_shard_allowed_on_node_true(bigint, int)
RETURNS boolean AS $$ SELECT true $$ LANGUAGE sql;
COMMENT ON FUNCTION pg_catalog.citus_shard_allowed_on_node_true(bigint,int)
IS 'a shard_allowed_on_node_function for use by the rebalance algorithm that always returns true';

View File

@ -0,0 +1,5 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_shard_allowed_on_node_true(bigint, int)
RETURNS boolean AS $$ SELECT true $$ LANGUAGE sql;
COMMENT ON FUNCTION pg_catalog.citus_shard_allowed_on_node_true(bigint,int)
IS 'a shard_allowed_on_node_function for use by the rebalance algorithm that always returns true';

View File

@ -0,0 +1,4 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_shard_cost_1(bigint)
RETURNS float4 AS $$ SELECT 1.0::float4 $$ LANGUAGE sql;
COMMENT ON FUNCTION pg_catalog.citus_shard_cost_1(bigint)
IS 'a shard cost function for use by the rebalance algorithm that always returns 1';

View File

@ -0,0 +1,4 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_shard_cost_1(bigint)
RETURNS float4 AS $$ SELECT 1.0::float4 $$ LANGUAGE sql;
COMMENT ON FUNCTION pg_catalog.citus_shard_cost_1(bigint)
IS 'a shard cost function for use by the rebalance algorithm that always returns 1';

View File

@ -0,0 +1,6 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_shard_cost_by_disk_size(bigint)
RETURNS float4
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT VOLATILE;
COMMENT ON FUNCTION pg_catalog.citus_shard_cost_by_disk_size(bigint)
IS 'a shard cost function for use by the rebalance algorithm that returns the disk size in bytes for the specified shard and the shards that are colocated with it';

View File

@ -0,0 +1,6 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_shard_cost_by_disk_size(bigint)
RETURNS float4
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT VOLATILE;
COMMENT ON FUNCTION pg_catalog.citus_shard_cost_by_disk_size(bigint)
IS 'a shard cost function for use by the rebalance algorithm that returns the disk size in bytes for the specified shard and the shards that are colocated with it';

View File

@ -0,0 +1,10 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_validate_rebalance_strategy_functions(
shard_cost_function regproc,
node_capacity_function regproc,
shard_allowed_on_node_function regproc
)
RETURNS VOID
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT VOLATILE;
COMMENT ON FUNCTION pg_catalog.citus_validate_rebalance_strategy_functions(regproc,regproc,regproc)
IS 'internal function used by citus to validate signatures of functions used in rebalance strategy';

View File

@ -0,0 +1,10 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_validate_rebalance_strategy_functions(
shard_cost_function regproc,
node_capacity_function regproc,
shard_allowed_on_node_function regproc
)
RETURNS VOID
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT VOLATILE;
COMMENT ON FUNCTION pg_catalog.citus_validate_rebalance_strategy_functions(regproc,regproc,regproc)
IS 'internal function used by citus to validate signatures of functions used in rebalance strategy';

View File

@ -0,0 +1,26 @@
-- get_rebalance_table_shards_plan shows the actual events that will be performed
-- if a rebalance operation will be performed with the same arguments, which allows users
-- to understand the impact of the change overall availability of the application and
-- network trafic.
--
DROP FUNCTION pg_catalog.get_rebalance_table_shards_plan;
CREATE OR REPLACE FUNCTION pg_catalog.get_rebalance_table_shards_plan(
relation regclass default NULL,
threshold float4 default NULL,
max_shard_moves int default 1000000,
excluded_shard_list bigint[] default '{}',
drain_only boolean default false,
rebalance_strategy name default NULL
)
RETURNS TABLE (table_name regclass,
shardid bigint,
shard_size bigint,
sourcename text,
sourceport int,
targetname text,
targetport int)
AS 'MODULE_PATHNAME'
LANGUAGE C VOLATILE;
COMMENT ON FUNCTION pg_catalog.get_rebalance_table_shards_plan(regclass, float4, int, bigint[], boolean, name)
IS 'returns the list of shard placement moves to be done on a rebalance operation';

View File

@ -6,10 +6,12 @@
DROP FUNCTION pg_catalog.get_rebalance_table_shards_plan;
CREATE OR REPLACE FUNCTION pg_catalog.get_rebalance_table_shards_plan(
relation regclass default NULL,
threshold float4 default 0,
threshold float4 default NULL,
max_shard_moves int default 1000000,
excluded_shard_list bigint[] default '{}',
drain_only boolean default false)
drain_only boolean default false,
rebalance_strategy name default NULL
)
RETURNS TABLE (table_name regclass,
shardid bigint,
shard_size bigint,
@ -19,5 +21,6 @@ CREATE OR REPLACE FUNCTION pg_catalog.get_rebalance_table_shards_plan(
targetport int)
AS 'MODULE_PATHNAME'
LANGUAGE C VOLATILE;
COMMENT ON FUNCTION pg_catalog.get_rebalance_table_shards_plan(regclass, float4, int, bigint[], boolean)
COMMENT ON FUNCTION pg_catalog.get_rebalance_table_shards_plan(regclass, float4, int, bigint[], boolean, name)
IS 'returns the list of shard placement moves to be done on a rebalance operation';

View File

@ -0,0 +1,14 @@
DROP FUNCTION pg_catalog.master_drain_node;
CREATE FUNCTION pg_catalog.master_drain_node(
nodename text,
nodeport integer,
shard_transfer_mode citus.shard_transfer_mode default 'auto',
rebalance_strategy name default NULL
)
RETURNS VOID
LANGUAGE C
AS 'MODULE_PATHNAME', $$master_drain_node$$;
COMMENT ON FUNCTION pg_catalog.master_drain_node(text,int,citus.shard_transfer_mode,name)
IS 'mark a node to be drained of data and actually drain it as well';
REVOKE ALL ON FUNCTION pg_catalog.master_drain_node(text,int,citus.shard_transfer_mode,name) FROM PUBLIC;

View File

@ -1,11 +1,14 @@
DROP FUNCTION pg_catalog.master_drain_node;
CREATE FUNCTION pg_catalog.master_drain_node(
nodename text,
nodeport integer,
shard_transfer_mode citus.shard_transfer_mode default 'auto')
shard_transfer_mode citus.shard_transfer_mode default 'auto',
rebalance_strategy name default NULL
)
RETURNS VOID
LANGUAGE C STRICT
LANGUAGE C
AS 'MODULE_PATHNAME', $$master_drain_node$$;
COMMENT ON FUNCTION pg_catalog.master_drain_node(text,int,citus.shard_transfer_mode)
COMMENT ON FUNCTION pg_catalog.master_drain_node(text,int,citus.shard_transfer_mode,name)
IS 'mark a node to be drained of data and actually drain it as well';
REVOKE ALL ON FUNCTION pg_catalog.master_drain_node(text,int,citus.shard_transfer_mode) FROM PUBLIC;
REVOKE ALL ON FUNCTION pg_catalog.master_drain_node(text,int,citus.shard_transfer_mode,name) FROM PUBLIC;

View File

@ -0,0 +1,28 @@
-- Ensures that only a single default strategy is possible
CREATE OR REPLACE FUNCTION citus_internal.pg_dist_rebalance_strategy_trigger_func()
RETURNS TRIGGER AS $$
BEGIN
-- citus_add_rebalance_strategy also takes out a ShareRowExclusiveLock
LOCK TABLE pg_dist_rebalance_strategy IN SHARE ROW EXCLUSIVE MODE;
PERFORM citus_validate_rebalance_strategy_functions(
NEW.shard_cost_function,
NEW.node_capacity_function,
NEW.shard_allowed_on_node_function);
IF NEW.default_threshold < NEW.minimum_threshold THEN
RAISE EXCEPTION 'default_threshold cannot be smaller than minimum_threshold';
END IF;
IF NOT NEW.default_strategy THEN
RETURN NEW;
END IF;
IF TG_OP = 'UPDATE' AND NEW.default_strategy = OLD.default_strategy THEN
return NEW;
END IF;
IF EXISTS (SELECT 1 FROM pg_dist_rebalance_strategy WHERE default_strategy) THEN
RAISE EXCEPTION 'there cannot be two default strategies';
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

View File

@ -0,0 +1,28 @@
-- Ensures that only a single default strategy is possible
CREATE OR REPLACE FUNCTION citus_internal.pg_dist_rebalance_strategy_trigger_func()
RETURNS TRIGGER AS $$
BEGIN
-- citus_add_rebalance_strategy also takes out a ShareRowExclusiveLock
LOCK TABLE pg_dist_rebalance_strategy IN SHARE ROW EXCLUSIVE MODE;
PERFORM citus_validate_rebalance_strategy_functions(
NEW.shard_cost_function,
NEW.node_capacity_function,
NEW.shard_allowed_on_node_function);
IF NEW.default_threshold < NEW.minimum_threshold THEN
RAISE EXCEPTION 'default_threshold cannot be smaller than minimum_threshold';
END IF;
IF NOT NEW.default_strategy THEN
RETURN NEW;
END IF;
IF TG_OP = 'UPDATE' AND NEW.default_strategy = OLD.default_strategy THEN
return NEW;
END IF;
IF EXISTS (SELECT 1 FROM pg_dist_rebalance_strategy WHERE default_strategy) THEN
RAISE EXCEPTION 'there cannot be two default strategies';
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

View File

@ -0,0 +1,18 @@
-- rebalance_table_shards uses the shard rebalancer's C UDF functions to rebalance
-- shards of the given relation.
--
DROP FUNCTION pg_catalog.rebalance_table_shards;
CREATE OR REPLACE FUNCTION pg_catalog.rebalance_table_shards(
relation regclass default NULL,
threshold float4 default NULL,
max_shard_moves int default 1000000,
excluded_shard_list bigint[] default '{}',
shard_transfer_mode citus.shard_transfer_mode default 'auto',
drain_only boolean default false,
rebalance_strategy name default NULL
)
RETURNS VOID
AS 'MODULE_PATHNAME'
LANGUAGE C VOLATILE;
COMMENT ON FUNCTION pg_catalog.rebalance_table_shards(regclass, float4, int, bigint[], citus.shard_transfer_mode, boolean, name)
IS 'rebalance the shards of the given table across the worker nodes (including colocated shards of other tables)';

View File

@ -4,13 +4,15 @@
DROP FUNCTION pg_catalog.rebalance_table_shards;
CREATE OR REPLACE FUNCTION pg_catalog.rebalance_table_shards(
relation regclass default NULL,
threshold float4 default 0,
threshold float4 default NULL,
max_shard_moves int default 1000000,
excluded_shard_list bigint[] default '{}',
shard_transfer_mode citus.shard_transfer_mode default 'auto',
drain_only boolean default false)
drain_only boolean default false,
rebalance_strategy name default NULL
)
RETURNS VOID
AS 'MODULE_PATHNAME'
LANGUAGE C VOLATILE;
COMMENT ON FUNCTION pg_catalog.rebalance_table_shards(regclass, float4, int, bigint[], citus.shard_transfer_mode, boolean)
COMMENT ON FUNCTION pg_catalog.rebalance_table_shards(regclass, float4, int, bigint[], citus.shard_transfer_mode, boolean, name)
IS 'rebalance the shards of the given table across the worker nodes (including colocated shards of other tables)';

View File

@ -0,0 +1,53 @@
/*-------------------------------------------------------------------------
*
* argutils.h
*
* Macros to help with argument parsing in UDFs.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
/*
* PG_ENSURE_ARGNOTNULL ensures that a UDF argument is not NULL and throws an
* error otherwise. This is useful for non STRICT UDFs where only some
* arguments are allowed to be NULL.
*/
#define PG_ENSURE_ARGNOTNULL(argIndex, argName) \
if (PG_ARGISNULL(argIndex)) \
{ \
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), \
errmsg("%s cannot be NULL", argName))); \
}
/*
* PG_GETARG_TEXT_TO_CSTRING is the same as PG_GETARG_TEXT_P, but instead of
* text* it returns char*. Just like most other PG_GETARG_* macros this assumes
* the argument is not NULL.
*/
#define PG_GETARG_TEXT_TO_CSTRING(argIndex) \
text_to_cstring(PG_GETARG_TEXT_P(argIndex))
/*
* PG_GETARG_TEXT_TO_CSTRING_OR_NULL is the same as PG_GETARG_TEXT_TO_CSTRING,
* but it supports the case where the argument is NULL. In this case it will
* return a NULL pointer.
*/
#define PG_GETARG_TEXT_TO_CSTRING_OR_NULL(argIndex) \
PG_ARGISNULL(argIndex) ? NULL : PG_GETARG_TEXT_TO_CSTRING(argIndex)
/*
* PG_GETARG_NAME_OR_NULL is the same as PG_GETARG_NAME, but it supports the
* case where the argument is NULL. In this case it will return a NULL pointer.
*/
#define PG_GETARG_NAME_OR_NULL(argIndex) \
PG_ARGISNULL(argIndex) ? NULL : PG_GETARG_NAME(argIndex)
/*
* PG_GETARG_FLOAT4_OR is the same as PG_GETARG_FLOAT4, but it supports the
* case where the argument is NULL. In that case it will return the provided
* fallback.
*/
#define PG_GETARG_FLOAT4_OR_DEFAULT(argIndex, fallback) \
PG_ARGISNULL(argIndex) ? (fallback) : PG_GETARG_FLOAT4(argIndex)

View File

@ -112,6 +112,8 @@ extern ShardPlacement * FinalizedShardPlacement(uint64 shardId, bool missingOk);
extern List * BuildShardPlacementList(ShardInterval *shardInterval);
extern List * AllShardPlacementsOnNodeGroup(int32 groupId);
extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId);
extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
char *sizeQuery);
/* Function declarations to modify shard and shard placement data */
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,

View File

@ -165,6 +165,7 @@ extern Oid DistPartitionRelationId(void);
extern Oid DistShardRelationId(void);
extern Oid DistPlacementRelationId(void);
extern Oid DistNodeRelationId(void);
extern Oid DistRebalanceStrategyRelationId(void);
extern Oid DistLocalGroupIdRelationId(void);
extern Oid DistObjectRelationId(void);
extern Oid DistEnabledCustomAggregatesId(void);

View File

@ -0,0 +1,54 @@
/*-------------------------------------------------------------------------
*
* pg_dist_rebalance_strategy.h
* definition of the "rebalance strategy" relation (pg_dist_rebalance_strategy).
*
* This table contains all the available strategies for rebalancing.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef PG_DIST_REBALANCE_STRATEGY_H
#define PG_DIST_REBALANCE_STRATEGY_H
#include "postgres.h"
/* ----------------
* pg_dist_shard definition.
* ----------------
*/
typedef struct FormData_pg_dist_rebalance_strategy
{
NameData name; /* user readable name of the strategy */
bool default_strategy; /* if this strategy is the default strategy */
Oid shardCostFunction; /* function to calculate the shard cost */
Oid nodeCapacityFunction; /* function to get the capacity of a node */
Oid shardAllowedOnNodeFunction; /* function to check if shard is allowed on node */
float4 defaultThreshold; /* default threshold that is used */
float4 minimumThreshold; /* minimum threshold that is allowed */
} FormData_pg_dist_rebalance_strategy;
/* ----------------
* Form_pg_dist_shards corresponds to a pointer to a tuple with
* the format of pg_dist_shards relation.
* ----------------
*/
typedef FormData_pg_dist_rebalance_strategy *Form_pg_dist_rebalance_strategy;
/* ----------------
* compiler constants for pg_dist_rebalance_strategy
* ----------------
*/
#define Natts_pg_dist_rebalance_strategy 7
#define Anum_pg_dist_rebalance_strategy_name 1
#define Anum_pg_dist_rebalance_strategy_default_strategy 2
#define Anum_pg_dist_rebalance_strategy_shard_cost_function 3
#define Anum_pg_dist_rebalance_strategy_node_capacity_function 4
#define Anum_pg_dist_rebalance_strategy_shard_allowed_on_node_function 5
#define Anum_pg_dist_rebalance_strategy_default_threshold 6
#define Anum_pg_dist_rebalance_strategy_minimum_threshold 7
#endif /* PG_DIST_REBALANCE_STRATEGY_H */

View File

@ -1 +1 @@
test: upgrade_basic_after upgrade_type_after upgrade_ref2ref_after upgrade_distributed_function_after
test: upgrade_basic_after upgrade_type_after upgrade_ref2ref_after upgrade_distributed_function_after upgrade_rebalance_strategy_after

View File

@ -2,4 +2,4 @@
test: multi_test_helpers
test: multi_test_catalog_views
test: upgrade_basic_before
test: upgrade_type_before upgrade_ref2ref_before upgrade_distributed_function_before
test: upgrade_type_before upgrade_ref2ref_before upgrade_distributed_function_before upgrade_rebalance_strategy_before

View File

@ -25,3 +25,7 @@ ERROR: cannot write to pg_dist_poolinfo
DETAIL: Citus Community Edition does not support the use of pooler options.
HINT: To learn more about using advanced pooling schemes with Citus, please contact us at https://citusdata.com/about/contact_us
ROLLBACK;
INSERT INTO pg_dist_rebalance_strategy VALUES ('should fail', false, 'citus_shard_cost_1', 'citus_node_capacity_1', 'citus_shard_allowed_on_node_true', 0, 0);
ERROR: cannot write to pg_dist_rebalance_strategy
DETAIL: Citus Community Edition does not support the use of custom rebalance strategies.
HINT: To learn more about using advanced rebalancing schemes with Citus, please contact us at https://citusdata.com/about/contact_us

View File

@ -0,0 +1,8 @@
SELECT * FROM pg_catalog.pg_dist_rebalance_strategy ORDER BY name;
name | default_strategy | shard_cost_function | node_capacity_function | shard_allowed_on_node_function | default_threshold | minimum_threshold
-----------------+------------------+-----------------------------------------+---------------------------------------------------+------------------------------------------+-------------------+-------------------
by_disk_size | f | citus_shard_cost_by_disk_size | citus_node_capacity_1 | citus_shard_allowed_on_node_true | 0.1 | 0.01
by_shard_count | f | citus_shard_cost_1 | citus_node_capacity_1 | citus_shard_allowed_on_node_true | 0 | 0
custom_strategy | t | upgrade_rebalance_strategy.shard_cost_2 | upgrade_rebalance_strategy.capacity_high_worker_1 | upgrade_rebalance_strategy.only_worker_2 | 0.5 | 0.2
(3 rows)

View File

@ -0,0 +1,38 @@
CREATE SCHEMA upgrade_rebalance_strategy;
SET search_path TO upgrade_rebalance_strategy, public;
-- The following function signatures should always keep working
CREATE FUNCTION shard_cost_2(bigint)
RETURNS float4 AS $$ SELECT 2.0::float4 $$ LANGUAGE sql;
CREATE FUNCTION capacity_high_worker_1(nodeidarg int)
RETURNS real AS $$
SELECT
(CASE WHEN nodeport = 57637 THEN 1000 ELSE 1 END)::real
FROM pg_dist_node where nodeid = nodeidarg
$$ LANGUAGE sql;
CREATE FUNCTION only_worker_2(shardid bigint, nodeidarg int)
RETURNS boolean AS $$
SELECT
(CASE WHEN nodeport = 57638 THEN TRUE ELSE FALSE END)
FROM pg_dist_node where nodeid = nodeidarg
$$ LANGUAGE sql;
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DISABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger;
SELECT citus_add_rebalance_strategy(
'custom_strategy',
'shard_cost_2',
'capacity_high_worker_1',
'only_worker_2',
0.5,
0.2
);
citus_add_rebalance_strategy
------------------------------
(1 row)
SELECT citus_set_default_rebalance_strategy('custom_strategy');
citus_set_default_rebalance_strategy
--------------------------------------
(1 row)
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger;

View File

@ -21,3 +21,4 @@ BEGIN;
INSERT INTO pg_dist_node VALUES (1234567890, 1234567890, 'localhost', 5432);
INSERT INTO pg_dist_poolinfo VALUES (1234567890, 'port=1234');
ROLLBACK;
INSERT INTO pg_dist_rebalance_strategy VALUES ('should fail', false, 'citus_shard_cost_1', 'citus_node_capacity_1', 'citus_shard_allowed_on_node_true', 0, 0);

View File

@ -0,0 +1 @@
SELECT * FROM pg_catalog.pg_dist_rebalance_strategy ORDER BY name;

View File

@ -0,0 +1,32 @@
CREATE SCHEMA upgrade_rebalance_strategy;
SET search_path TO upgrade_rebalance_strategy, public;
-- The following function signatures should always keep working
CREATE FUNCTION shard_cost_2(bigint)
RETURNS float4 AS $$ SELECT 2.0::float4 $$ LANGUAGE sql;
CREATE FUNCTION capacity_high_worker_1(nodeidarg int)
RETURNS real AS $$
SELECT
(CASE WHEN nodeport = 57637 THEN 1000 ELSE 1 END)::real
FROM pg_dist_node where nodeid = nodeidarg
$$ LANGUAGE sql;
CREATE FUNCTION only_worker_2(shardid bigint, nodeidarg int)
RETURNS boolean AS $$
SELECT
(CASE WHEN nodeport = 57638 THEN TRUE ELSE FALSE END)
FROM pg_dist_node where nodeid = nodeidarg
$$ LANGUAGE sql;
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DISABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger;
SELECT citus_add_rebalance_strategy(
'custom_strategy',
'shard_cost_2',
'capacity_high_worker_1',
'only_worker_2',
0.5,
0.2
);
SELECT citus_set_default_rebalance_strategy('custom_strategy');
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger;