citus/src/backend/distributed/master/shard_rebalancer.c

205 lines
6.2 KiB
C

/*-------------------------------------------------------------------------
*
* shard_rebalancer.c
*
* Function definitions for the shard rebalancer tool.
*
* Copyright (c) Citus Data, Inc.
*
* $Id$
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/htup_details.h"
#include "catalog/pg_type.h"
#include "catalog/pg_proc.h"
#include "distributed/enterprise.h"
#include "utils/syscache.h"
static void EnsureShardCostUDF(Oid functionOid);
static void EnsureNodeCapacityUDF(Oid functionOid);
static void EnsureShardAllowedOnNodeUDF(Oid functionOid);
NOT_SUPPORTED_IN_COMMUNITY(rebalance_table_shards);
NOT_SUPPORTED_IN_COMMUNITY(replicate_table_shards);
NOT_SUPPORTED_IN_COMMUNITY(get_rebalance_table_shards_plan);
NOT_SUPPORTED_IN_COMMUNITY(get_rebalance_progress);
NOT_SUPPORTED_IN_COMMUNITY(master_drain_node);
NOT_SUPPORTED_IN_COMMUNITY(citus_shard_cost_by_disk_size);
PG_FUNCTION_INFO_V1(pg_dist_rebalance_strategy_enterprise_check);
PG_FUNCTION_INFO_V1(citus_validate_rebalance_strategy_functions);
/*
* citus_rebalance_strategy_enterprise_check is trigger function, intended for
* use in prohibiting writes to pg_dist_rebalance_strategy in Citus Community.
*/
Datum
pg_dist_rebalance_strategy_enterprise_check(PG_FUNCTION_ARGS)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot write to pg_dist_rebalance_strategy"),
errdetail(
"Citus Community Edition does not support the use of "
"custom rebalance strategies."),
errhint(
"To learn more about using advanced rebalancing schemes "
"with Citus, please contact us at "
"https://citusdata.com/about/contact_us")));
}
/*
* citus_validate_rebalance_strategy_functions checks all the functions for
* their correct signature.
*
* SQL signature:
*
* citus_validate_rebalance_strategy_functions(
* shard_cost_function regproc,
* node_capacity_function regproc,
* shard_allowed_on_node_function regproc,
* ) RETURNS VOID
*/
Datum
citus_validate_rebalance_strategy_functions(PG_FUNCTION_ARGS)
{
EnsureShardCostUDF(PG_GETARG_OID(0));
EnsureNodeCapacityUDF(PG_GETARG_OID(1));
EnsureShardAllowedOnNodeUDF(PG_GETARG_OID(2));
PG_RETURN_VOID();
}
/*
* EnsureShardCostUDF checks that the UDF matching the oid has the correct
* signature to be used as a ShardCost function. The expected signature is:
*
* shard_cost(shardid bigint) returns float4
*/
static void
EnsureShardCostUDF(Oid functionOid)
{
HeapTuple proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(functionOid));
if (!HeapTupleIsValid(proctup))
{
ereport(ERROR, (errmsg("cache lookup failed for shard_cost_function with oid %u",
functionOid)));
}
Form_pg_proc procForm = (Form_pg_proc) GETSTRUCT(proctup);
char *name = NameStr(procForm->proname);
if (procForm->pronargs != 1)
{
ereport(ERROR, (errmsg("signature for shard_cost_function is incorrect"),
errdetail(
"number of arguments of %s should be 1, not %i",
name, procForm->pronargs)));
}
if (procForm->proargtypes.values[0] != INT8OID)
{
ereport(ERROR, (errmsg("signature for shard_cost_function is incorrect"),
errdetail(
"argument type of %s should be bigint", name)));
}
if (procForm->prorettype != FLOAT4OID)
{
ereport(ERROR, (errmsg("signature for shard_cost_function is incorrect"),
errdetail("return type of %s should be real", name)));
}
ReleaseSysCache(proctup);
}
/*
* EnsureNodeCapacityUDF checks that the UDF matching the oid has the correct
* signature to be used as a NodeCapacity function. The expected signature is:
*
* node_capacity(nodeid int) returns float4
*/
static void
EnsureNodeCapacityUDF(Oid functionOid)
{
HeapTuple proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(functionOid));
if (!HeapTupleIsValid(proctup))
{
ereport(ERROR, (errmsg(
"cache lookup failed for node_capacity_function with oid %u",
functionOid)));
}
Form_pg_proc procForm = (Form_pg_proc) GETSTRUCT(proctup);
char *name = NameStr(procForm->proname);
if (procForm->pronargs != 1)
{
ereport(ERROR, (errmsg("signature for node_capacity_function is incorrect"),
errdetail(
"number of arguments of %s should be 1, not %i",
name, procForm->pronargs)));
}
if (procForm->proargtypes.values[0] != INT4OID)
{
ereport(ERROR, (errmsg("signature for node_capacity_function is incorrect"),
errdetail("argument type of %s should be int", name)));
}
if (procForm->prorettype != FLOAT4OID)
{
ereport(ERROR, (errmsg("signature for node_capacity_function is incorrect"),
errdetail("return type of %s should be real", name)));
}
ReleaseSysCache(proctup);
}
/*
* EnsureNodeCapacityUDF checks that the UDF matching the oid has the correct
* signature to be used as a NodeCapacity function. The expected signature is:
*
* shard_allowed_on_node(shardid bigint, nodeid int) returns boolean
*/
static void
EnsureShardAllowedOnNodeUDF(Oid functionOid)
{
HeapTuple proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(functionOid));
if (!HeapTupleIsValid(proctup))
{
ereport(ERROR, (errmsg(
"cache lookup failed for shard_allowed_on_node_function with oid %u",
functionOid)));
}
Form_pg_proc procForm = (Form_pg_proc) GETSTRUCT(proctup);
char *name = NameStr(procForm->proname);
if (procForm->pronargs != 2)
{
ereport(ERROR, (errmsg(
"signature for shard_allowed_on_node_function is incorrect"),
errdetail(
"number of arguments of %s should be 2, not %i",
name, procForm->pronargs)));
}
if (procForm->proargtypes.values[0] != INT8OID)
{
ereport(ERROR, (errmsg(
"signature for shard_allowed_on_node_function is incorrect"),
errdetail(
"type of first argument of %s should be bigint", name)));
}
if (procForm->proargtypes.values[1] != INT4OID)
{
ereport(ERROR, (errmsg(
"signature for shard_allowed_on_node_function is incorrect"),
errdetail(
"type of second argument of %s should be int", name)));
}
if (procForm->prorettype != BOOLOID)
{
ereport(ERROR, (errmsg(
"signature for shard_allowed_on_node_function is incorrect"),
errdetail(
"return type of %s should be boolean", name)));
}
ReleaseSysCache(proctup);
}