From b655c023522b46b61f412198b4837a1a3facf9fe Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Thu, 19 Dec 2019 15:23:08 +0100 Subject: [PATCH] Add the necessary changes for rebalance strategies on enterprise (#3325) This commit adds the SQL and C changes necessary to support custom rebalance strategies in the Enterprise version of Citus. --- .../master/master_metadata_utility.c | 30 ++- .../distributed/master/shard_rebalancer.c | 184 ++++++++++++++++++ .../distributed/metadata/metadata_cache.c | 12 ++ .../distributed/sql/citus--9.1-1--9.2-1.sql | 70 +++++++ .../citus_add_rebalance_strategy/9.2-1.sql | 28 +++ .../citus_add_rebalance_strategy/latest.sql | 28 +++ .../udfs/citus_finish_pg_upgrade/9.2-1.sql | 113 +++++++++++ .../udfs/citus_finish_pg_upgrade/latest.sql | 12 ++ .../sql/udfs/citus_node_capacity_1/9.2-1.sql | 4 + .../sql/udfs/citus_node_capacity_1/latest.sql | 4 + .../udfs/citus_prepare_pg_upgrade/9.2-1.sql | 38 ++++ .../udfs/citus_prepare_pg_upgrade/latest.sql | 9 + .../9.2-1.sql | 18 ++ .../latest.sql | 18 ++ .../9.2-1.sql | 5 + .../latest.sql | 5 + .../sql/udfs/citus_shard_cost_1/9.2-1.sql | 4 + .../sql/udfs/citus_shard_cost_1/latest.sql | 4 + .../citus_shard_cost_by_disk_size/9.2-1.sql | 6 + .../citus_shard_cost_by_disk_size/latest.sql | 6 + .../9.2-1.sql | 10 + .../latest.sql | 10 + .../get_rebalance_table_shards_plan/9.2-1.sql | 26 +++ .../latest.sql | 9 +- .../sql/udfs/master_drain_node/9.2-1.sql | 14 ++ .../sql/udfs/master_drain_node/latest.sql | 11 +- .../9.2-1.sql | 28 +++ .../latest.sql | 28 +++ .../sql/udfs/rebalance_table_shards/9.2-1.sql | 18 ++ .../udfs/rebalance_table_shards/latest.sql | 8 +- src/include/distributed/argutils.h | 53 +++++ .../distributed/master_metadata_utility.h | 2 + src/include/distributed/metadata_cache.h | 1 + .../distributed/pg_dist_rebalance_strategy.h | 54 +++++ src/test/regress/after_pg_upgrade_schedule | 2 +- src/test/regress/before_pg_upgrade_schedule | 2 +- .../expected/multi_utility_warnings.out | 4 + .../upgrade_rebalance_strategy_after.out | 8 + .../upgrade_rebalance_strategy_before.out | 38 ++++ .../regress/sql/multi_utility_warnings.sql | 1 + .../sql/upgrade_rebalance_strategy_after.sql | 1 + .../sql/upgrade_rebalance_strategy_before.sql | 32 +++ 42 files changed, 928 insertions(+), 30 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/citus_add_rebalance_strategy/9.2-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_add_rebalance_strategy/latest.sql create mode 100644 src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/9.2-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_node_capacity_1/9.2-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_node_capacity_1/latest.sql create mode 100644 src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/9.2-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_set_default_rebalance_strategy/9.2-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_set_default_rebalance_strategy/latest.sql create mode 100644 src/backend/distributed/sql/udfs/citus_shard_allowed_on_node_true/9.2-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_shard_allowed_on_node_true/latest.sql create mode 100644 src/backend/distributed/sql/udfs/citus_shard_cost_1/9.2-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_shard_cost_1/latest.sql create mode 100644 src/backend/distributed/sql/udfs/citus_shard_cost_by_disk_size/9.2-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_shard_cost_by_disk_size/latest.sql create mode 100644 src/backend/distributed/sql/udfs/citus_validate_rebalance_strategy_functions/9.2-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_validate_rebalance_strategy_functions/latest.sql create mode 100644 src/backend/distributed/sql/udfs/get_rebalance_table_shards_plan/9.2-1.sql create mode 100644 src/backend/distributed/sql/udfs/master_drain_node/9.2-1.sql create mode 100644 src/backend/distributed/sql/udfs/pg_dist_rebalance_strategy_trigger_func/9.2-1.sql create mode 100644 src/backend/distributed/sql/udfs/pg_dist_rebalance_strategy_trigger_func/latest.sql create mode 100644 src/backend/distributed/sql/udfs/rebalance_table_shards/9.2-1.sql create mode 100644 src/include/distributed/argutils.h create mode 100644 src/include/distributed/pg_dist_rebalance_strategy.h create mode 100644 src/test/regress/expected/upgrade_rebalance_strategy_after.out create mode 100644 src/test/regress/expected/upgrade_rebalance_strategy_before.out create mode 100644 src/test/regress/sql/upgrade_rebalance_strategy_after.sql create mode 100644 src/test/regress/sql/upgrade_rebalance_strategy_before.sql diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index e9c6e5e3b..23156ffcb 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -71,9 +71,6 @@ static uint64 DistributedTableSize(Oid relationId, char *sizeQuery); static uint64 DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, char *sizeQuery); static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId); -static StringInfo GenerateSizeQueryOnMultiplePlacements(Oid distributedRelationId, - List *shardIntervalList, - char *sizeQuery); static void ErrorIfNotSuitableToGetSize(Oid relationId); @@ -212,9 +209,9 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, char *sizeQ List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode, relationId); - StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(relationId, - shardIntervalsOnNode, - sizeQuery); + StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements( + shardIntervalsOnNode, + sizeQuery); MultiConnection *connection = GetNodeConnection(connectionFlag, workerNodeName, workerNodePort); @@ -328,19 +325,14 @@ ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId) /* * GenerateSizeQueryOnMultiplePlacements generates a select size query to get - * size of multiple tables from the relation with distributedRelationId. Note - * that, different size functions supported by PG are also supported by this - * function changing the size query given as the last parameter to function. - * Format of sizeQuery is pg_*_size(%s). Examples of it can be found in the - * master_protocol.h + * size of multiple tables. Note that, different size functions supported by PG + * are also supported by this function changing the size query given as the + * last parameter to function. Format of sizeQuery is pg_*_size(%s). Examples + * of it can be found in the master_protocol.h */ -static StringInfo -GenerateSizeQueryOnMultiplePlacements(Oid distributedRelationId, List *shardIntervalList, - char *sizeQuery) +StringInfo +GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, char *sizeQuery) { - Oid schemaId = get_rel_namespace(distributedRelationId); - char *schemaName = get_namespace_name(schemaId); - StringInfo selectQuery = makeStringInfo(); ListCell *shardIntervalCell = NULL; @@ -350,7 +342,9 @@ GenerateSizeQueryOnMultiplePlacements(Oid distributedRelationId, List *shardInte { ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); uint64 shardId = shardInterval->shardId; - char *shardName = get_rel_name(distributedRelationId); + Oid schemaId = get_rel_namespace(shardInterval->relationId); + char *schemaName = get_namespace_name(schemaId); + char *shardName = get_rel_name(shardInterval->relationId); AppendShardIdToName(&shardName, shardId); char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName); diff --git a/src/backend/distributed/master/shard_rebalancer.c b/src/backend/distributed/master/shard_rebalancer.c index 194a430d3..dda7a849f 100644 --- a/src/backend/distributed/master/shard_rebalancer.c +++ b/src/backend/distributed/master/shard_rebalancer.c @@ -11,10 +11,194 @@ *------------------------------------------------------------------------- */ +#include "postgres.h" + +#include "access/htup_details.h" +#include "catalog/pg_type.h" +#include "catalog/pg_proc.h" #include "distributed/enterprise.h" +#include "utils/syscache.h" + + +static void EnsureShardCostUDF(Oid functionOid); +static void EnsureNodeCapacityUDF(Oid functionOid); +static void EnsureShardAllowedOnNodeUDF(Oid functionOid); NOT_SUPPORTED_IN_COMMUNITY(rebalance_table_shards); NOT_SUPPORTED_IN_COMMUNITY(replicate_table_shards); NOT_SUPPORTED_IN_COMMUNITY(get_rebalance_table_shards_plan); NOT_SUPPORTED_IN_COMMUNITY(get_rebalance_progress); NOT_SUPPORTED_IN_COMMUNITY(master_drain_node); +NOT_SUPPORTED_IN_COMMUNITY(citus_shard_cost_by_disk_size); +PG_FUNCTION_INFO_V1(pg_dist_rebalance_strategy_enterprise_check); +PG_FUNCTION_INFO_V1(citus_validate_rebalance_strategy_functions); + + +/* + * citus_rebalance_strategy_enterprise_check is trigger function, intended for + * use in prohibiting writes to pg_dist_rebalance_strategy in Citus Community. + */ +Datum +pg_dist_rebalance_strategy_enterprise_check(PG_FUNCTION_ARGS) +{ + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot write to pg_dist_rebalance_strategy"), + errdetail( + "Citus Community Edition does not support the use of " + "custom rebalance strategies."), + errhint( + "To learn more about using advanced rebalancing schemes " + "with Citus, please contact us at " + "https://citusdata.com/about/contact_us"))); +} + + +/* + * citus_validate_rebalance_strategy_functions checks all the functions for + * their correct signature. + * + * SQL signature: + * + * citus_validate_rebalance_strategy_functions( + * shard_cost_function regproc, + * node_capacity_function regproc, + * shard_allowed_on_node_function regproc, + * ) RETURNS VOID + */ +Datum +citus_validate_rebalance_strategy_functions(PG_FUNCTION_ARGS) +{ + EnsureShardCostUDF(PG_GETARG_OID(0)); + EnsureNodeCapacityUDF(PG_GETARG_OID(1)); + EnsureShardAllowedOnNodeUDF(PG_GETARG_OID(2)); + PG_RETURN_VOID(); +} + + +/* + * EnsureShardCostUDF checks that the UDF matching the oid has the correct + * signature to be used as a ShardCost function. The expected signature is: + * + * shard_cost(shardid bigint) returns float4 + */ +static void +EnsureShardCostUDF(Oid functionOid) +{ + HeapTuple proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(functionOid)); + if (!HeapTupleIsValid(proctup)) + { + ereport(ERROR, (errmsg("cache lookup failed for shard_cost_function with oid %u", + functionOid))); + } + Form_pg_proc procForm = (Form_pg_proc) GETSTRUCT(proctup); + char *name = NameStr(procForm->proname); + if (procForm->pronargs != 1) + { + ereport(ERROR, (errmsg("signature for shard_cost_function is incorrect"), + errdetail( + "number of arguments of %s should be 1, not %i", + name, procForm->pronargs))); + } + if (procForm->proargtypes.values[0] != INT8OID) + { + ereport(ERROR, (errmsg("signature for shard_cost_function is incorrect"), + errdetail( + "argument type of %s should be bigint", name))); + } + if (procForm->prorettype != FLOAT4OID) + { + ereport(ERROR, (errmsg("signature for shard_cost_function is incorrect"), + errdetail("return type of %s should be real", name))); + } + ReleaseSysCache(proctup); +} + + +/* + * EnsureNodeCapacityUDF checks that the UDF matching the oid has the correct + * signature to be used as a NodeCapacity function. The expected signature is: + * + * node_capacity(nodeid int) returns float4 + */ +static void +EnsureNodeCapacityUDF(Oid functionOid) +{ + HeapTuple proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(functionOid)); + if (!HeapTupleIsValid(proctup)) + { + ereport(ERROR, (errmsg( + "cache lookup failed for node_capacity_function with oid %u", + functionOid))); + } + Form_pg_proc procForm = (Form_pg_proc) GETSTRUCT(proctup); + char *name = NameStr(procForm->proname); + if (procForm->pronargs != 1) + { + ereport(ERROR, (errmsg("signature for node_capacity_function is incorrect"), + errdetail( + "number of arguments of %s should be 1, not %i", + name, procForm->pronargs))); + } + if (procForm->proargtypes.values[0] != INT4OID) + { + ereport(ERROR, (errmsg("signature for node_capacity_function is incorrect"), + errdetail("argument type of %s should be int", name))); + } + if (procForm->prorettype != FLOAT4OID) + { + ereport(ERROR, (errmsg("signature for node_capacity_function is incorrect"), + errdetail("return type of %s should be real", name))); + } + ReleaseSysCache(proctup); +} + + +/* + * EnsureNodeCapacityUDF checks that the UDF matching the oid has the correct + * signature to be used as a NodeCapacity function. The expected signature is: + * + * shard_allowed_on_node(shardid bigint, nodeid int) returns boolean + */ +static void +EnsureShardAllowedOnNodeUDF(Oid functionOid) +{ + HeapTuple proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(functionOid)); + if (!HeapTupleIsValid(proctup)) + { + ereport(ERROR, (errmsg( + "cache lookup failed for shard_allowed_on_node_function with oid %u", + functionOid))); + } + Form_pg_proc procForm = (Form_pg_proc) GETSTRUCT(proctup); + char *name = NameStr(procForm->proname); + if (procForm->pronargs != 2) + { + ereport(ERROR, (errmsg( + "signature for shard_allowed_on_node_function is incorrect"), + errdetail( + "number of arguments of %s should be 2, not %i", + name, procForm->pronargs))); + } + if (procForm->proargtypes.values[0] != INT8OID) + { + ereport(ERROR, (errmsg( + "signature for shard_allowed_on_node_function is incorrect"), + errdetail( + "type of first argument of %s should be bigint", name))); + } + if (procForm->proargtypes.values[1] != INT4OID) + { + ereport(ERROR, (errmsg( + "signature for shard_allowed_on_node_function is incorrect"), + errdetail( + "type of second argument of %s should be int", name))); + } + if (procForm->prorettype != BOOLOID) + { + ereport(ERROR, (errmsg( + "signature for shard_allowed_on_node_function is incorrect"), + errdetail( + "return type of %s should be boolean", name))); + } + ReleaseSysCache(proctup); +} diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 5613fe801..df1d0625e 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -114,6 +114,7 @@ typedef struct MetadataCacheData bool extensionLoaded; Oid distShardRelationId; Oid distPlacementRelationId; + Oid distRebalanceStrategyRelationId; Oid distNodeRelationId; Oid distNodeNodeIdIndexId; Oid distLocalGroupRelationId; @@ -1860,6 +1861,17 @@ DistLocalGroupIdRelationId(void) } +/* return oid of pg_dist_rebalance_strategy relation */ +Oid +DistRebalanceStrategyRelationId(void) +{ + CachedRelationLookup("pg_dist_rebalance_strategy", + &MetadataCache.distRebalanceStrategyRelationId); + + return MetadataCache.distRebalanceStrategyRelationId; +} + + /* return the oid of citus namespace */ Oid CitusCatalogNamespaceId(void) diff --git a/src/backend/distributed/sql/citus--9.1-1--9.2-1.sql b/src/backend/distributed/sql/citus--9.1-1--9.2-1.sql index 5cdc5bf22..79a04e608 100644 --- a/src/backend/distributed/sql/citus--9.1-1--9.2-1.sql +++ b/src/backend/distributed/sql/citus--9.1-1--9.2-1.sql @@ -12,3 +12,73 @@ DROP INDEX pg_dist_colocation_configuration_index; CREATE INDEX pg_dist_colocation_configuration_index ON pg_dist_colocation USING btree(distributioncolumntype, shardcount, replicationfactor, distributioncolumncollation); +CREATE TABLE citus.pg_dist_rebalance_strategy( + name name NOT NULL, + default_strategy boolean NOT NULL DEFAULT false, + shard_cost_function regproc NOT NULL, + node_capacity_function regproc NOT NULL, + shard_allowed_on_node_function regproc NOT NULL, + default_threshold float4 NOT NULL, + minimum_threshold float4 NOT NULL DEFAULT 0, + UNIQUE(name) +); +ALTER TABLE citus.pg_dist_rebalance_strategy SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.pg_dist_rebalance_strategy TO public; + +#include "udfs/citus_validate_rebalance_strategy_functions/9.2-1.sql" +#include "udfs/pg_dist_rebalance_strategy_trigger_func/9.2-1.sql" +CREATE TRIGGER pg_dist_rebalance_strategy_validation_trigger + BEFORE INSERT OR UPDATE ON pg_dist_rebalance_strategy + FOR EACH ROW EXECUTE PROCEDURE citus_internal.pg_dist_rebalance_strategy_trigger_func(); + +#include "udfs/citus_add_rebalance_strategy/9.2-1.sql" +#include "udfs/citus_set_default_rebalance_strategy/9.2-1.sql" + +#include "udfs/citus_shard_cost_1/9.2-1.sql" +#include "udfs/citus_shard_cost_by_disk_size/9.2-1.sql" +#include "udfs/citus_node_capacity_1/9.2-1.sql" +#include "udfs/citus_shard_allowed_on_node_true/9.2-1.sql" + +INSERT INTO + pg_catalog.pg_dist_rebalance_strategy( + name, + default_strategy, + shard_cost_function, + node_capacity_function, + shard_allowed_on_node_function, + default_threshold, + minimum_threshold + ) VALUES ( + 'by_shard_count', + true, + 'citus_shard_cost_1', + 'citus_node_capacity_1', + 'citus_shard_allowed_on_node_true', + 0, + 0 + ), ( + 'by_disk_size', + false, + 'citus_shard_cost_by_disk_size', + 'citus_node_capacity_1', + 'citus_shard_allowed_on_node_true', + 0.1, + 0.01 + ); + + +CREATE FUNCTION citus_internal.pg_dist_rebalance_strategy_enterprise_check() + RETURNS TRIGGER + LANGUAGE C + AS 'MODULE_PATHNAME'; +CREATE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger + BEFORE INSERT OR UPDATE OR DELETE OR TRUNCATE ON pg_dist_rebalance_strategy + FOR EACH STATEMENT EXECUTE FUNCTION citus_internal.pg_dist_rebalance_strategy_enterprise_check(); + + +#include "udfs/master_drain_node/9.2-1.sql" +#include "udfs/rebalance_table_shards/9.2-1.sql" +#include "udfs/get_rebalance_table_shards_plan/9.2-1.sql" + +#include "udfs/citus_prepare_pg_upgrade/9.2-1.sql" +#include "udfs/citus_finish_pg_upgrade/9.2-1.sql" diff --git a/src/backend/distributed/sql/udfs/citus_add_rebalance_strategy/9.2-1.sql b/src/backend/distributed/sql/udfs/citus_add_rebalance_strategy/9.2-1.sql new file mode 100644 index 000000000..aeffc9c00 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_add_rebalance_strategy/9.2-1.sql @@ -0,0 +1,28 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_add_rebalance_strategy( + name name, + shard_cost_function regproc, + node_capacity_function regproc, + shard_allowed_on_node_function regproc, + default_threshold float4, + minimum_threshold float4 DEFAULT 0 +) + RETURNS VOID AS $$ + INSERT INTO + pg_catalog.pg_dist_rebalance_strategy( + name, + shard_cost_function, + node_capacity_function, + shard_allowed_on_node_function, + default_threshold, + minimum_threshold + ) VALUES ( + name, + shard_cost_function, + node_capacity_function, + shard_allowed_on_node_function, + default_threshold, + minimum_threshold + ); + $$ LANGUAGE sql; +COMMENT ON FUNCTION pg_catalog.citus_add_rebalance_strategy(name,regproc,regproc,regproc,float4, float4) + IS 'adds a new rebalance strategy which can be used when rebalancing shards or draining nodes'; diff --git a/src/backend/distributed/sql/udfs/citus_add_rebalance_strategy/latest.sql b/src/backend/distributed/sql/udfs/citus_add_rebalance_strategy/latest.sql new file mode 100644 index 000000000..aeffc9c00 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_add_rebalance_strategy/latest.sql @@ -0,0 +1,28 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_add_rebalance_strategy( + name name, + shard_cost_function regproc, + node_capacity_function regproc, + shard_allowed_on_node_function regproc, + default_threshold float4, + minimum_threshold float4 DEFAULT 0 +) + RETURNS VOID AS $$ + INSERT INTO + pg_catalog.pg_dist_rebalance_strategy( + name, + shard_cost_function, + node_capacity_function, + shard_allowed_on_node_function, + default_threshold, + minimum_threshold + ) VALUES ( + name, + shard_cost_function, + node_capacity_function, + shard_allowed_on_node_function, + default_threshold, + minimum_threshold + ); + $$ LANGUAGE sql; +COMMENT ON FUNCTION pg_catalog.citus_add_rebalance_strategy(name,regproc,regproc,regproc,float4, float4) + IS 'adds a new rebalance strategy which can be used when rebalancing shards or draining nodes'; diff --git a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/9.2-1.sql b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/9.2-1.sql new file mode 100644 index 000000000..d8cc4386d --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/9.2-1.sql @@ -0,0 +1,113 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_finish_pg_upgrade() + RETURNS void + LANGUAGE plpgsql + SET search_path = pg_catalog + AS $cppu$ +DECLARE + table_name regclass; + command text; + trigger_name text; +BEGIN + -- + -- restore citus catalog tables + -- + INSERT INTO pg_catalog.pg_dist_partition SELECT * FROM public.pg_dist_partition; + INSERT INTO pg_catalog.pg_dist_shard SELECT * FROM public.pg_dist_shard; + INSERT INTO pg_catalog.pg_dist_placement SELECT * FROM public.pg_dist_placement; + INSERT INTO pg_catalog.pg_dist_node_metadata SELECT * FROM public.pg_dist_node_metadata; + INSERT INTO pg_catalog.pg_dist_node SELECT * FROM public.pg_dist_node; + INSERT INTO pg_catalog.pg_dist_local_group SELECT * FROM public.pg_dist_local_group; + INSERT INTO pg_catalog.pg_dist_transaction SELECT * FROM public.pg_dist_transaction; + INSERT INTO pg_catalog.pg_dist_colocation SELECT * FROM public.pg_dist_colocation; + -- enterprise catalog tables + INSERT INTO pg_catalog.pg_dist_authinfo SELECT * FROM public.pg_dist_authinfo; + INSERT INTO pg_catalog.pg_dist_poolinfo SELECT * FROM public.pg_dist_poolinfo; + + ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DISABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger; + INSERT INTO pg_catalog.pg_dist_rebalance_strategy SELECT + name, + default_strategy, + shard_cost_function::regprocedure::regproc, + node_capacity_function::regprocedure::regproc, + shard_allowed_on_node_function::regprocedure::regproc, + default_threshold, + minimum_threshold + FROM public.pg_dist_rebalance_strategy; + ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger; + + -- + -- drop backup tables + -- + DROP TABLE public.pg_dist_authinfo; + DROP TABLE public.pg_dist_colocation; + DROP TABLE public.pg_dist_local_group; + DROP TABLE public.pg_dist_node; + DROP TABLE public.pg_dist_node_metadata; + DROP TABLE public.pg_dist_partition; + DROP TABLE public.pg_dist_placement; + DROP TABLE public.pg_dist_poolinfo; + DROP TABLE public.pg_dist_shard; + DROP TABLE public.pg_dist_transaction; + + -- + -- reset sequences + -- + PERFORM setval('pg_catalog.pg_dist_shardid_seq', (SELECT MAX(shardid)+1 AS max_shard_id FROM pg_dist_shard), false); + PERFORM setval('pg_catalog.pg_dist_placement_placementid_seq', (SELECT MAX(placementid)+1 AS max_placement_id FROM pg_dist_placement), false); + PERFORM setval('pg_catalog.pg_dist_groupid_seq', (SELECT MAX(groupid)+1 AS max_group_id FROM pg_dist_node), false); + PERFORM setval('pg_catalog.pg_dist_node_nodeid_seq', (SELECT MAX(nodeid)+1 AS max_node_id FROM pg_dist_node), false); + PERFORM setval('pg_catalog.pg_dist_colocationid_seq', (SELECT MAX(colocationid)+1 AS max_colocation_id FROM pg_dist_colocation), false); + + -- + -- register triggers + -- + FOR table_name IN SELECT logicalrelid FROM pg_catalog.pg_dist_partition + LOOP + trigger_name := 'truncate_trigger_' || table_name::oid; + command := 'create trigger ' || trigger_name || ' after truncate on ' || table_name || ' execute procedure pg_catalog.citus_truncate_trigger()'; + EXECUTE command; + command := 'update pg_trigger set tgisinternal = true where tgname = ' || quote_literal(trigger_name); + EXECUTE command; + END LOOP; + + -- + -- set dependencies + -- + INSERT INTO pg_depend + SELECT + 'pg_class'::regclass::oid as classid, + p.logicalrelid::regclass::oid as objid, + 0 as objsubid, + 'pg_extension'::regclass::oid as refclassid, + (select oid from pg_extension where extname = 'citus') as refobjid, + 0 as refobjsubid , + 'n' as deptype + FROM pg_catalog.pg_dist_partition p; + + -- restore pg_dist_object from the stable identifiers + -- DELETE/INSERT to avoid primary key violations + WITH old_records AS ( + DELETE FROM + citus.pg_dist_object + RETURNING + type, + object_names, + object_args, + distribution_argument_index, + colocationid + ) + INSERT INTO citus.pg_dist_object (classid, objid, objsubid, distribution_argument_index, colocationid) + SELECT + address.classid, + address.objid, + address.objsubid, + naming.distribution_argument_index, + naming.colocationid + FROM + old_records naming, + pg_get_object_address(naming.type, naming.object_names, naming.object_args) address; +END; +$cppu$; + +COMMENT ON FUNCTION pg_catalog.citus_finish_pg_upgrade() + IS 'perform tasks to restore citus settings from a location that has been prepared before pg_upgrade'; diff --git a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql index 042f649e7..d8cc4386d 100644 --- a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql @@ -23,6 +23,18 @@ BEGIN INSERT INTO pg_catalog.pg_dist_authinfo SELECT * FROM public.pg_dist_authinfo; INSERT INTO pg_catalog.pg_dist_poolinfo SELECT * FROM public.pg_dist_poolinfo; + ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DISABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger; + INSERT INTO pg_catalog.pg_dist_rebalance_strategy SELECT + name, + default_strategy, + shard_cost_function::regprocedure::regproc, + node_capacity_function::regprocedure::regproc, + shard_allowed_on_node_function::regprocedure::regproc, + default_threshold, + minimum_threshold + FROM public.pg_dist_rebalance_strategy; + ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger; + -- -- drop backup tables -- diff --git a/src/backend/distributed/sql/udfs/citus_node_capacity_1/9.2-1.sql b/src/backend/distributed/sql/udfs/citus_node_capacity_1/9.2-1.sql new file mode 100644 index 000000000..ea21f5143 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_node_capacity_1/9.2-1.sql @@ -0,0 +1,4 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_node_capacity_1(int) + RETURNS float4 AS $$ SELECT 1.0::float4 $$ LANGUAGE sql; +COMMENT ON FUNCTION pg_catalog.citus_node_capacity_1(int) + IS 'a node capacity function for use by the rebalance algorithm that always returns 1'; diff --git a/src/backend/distributed/sql/udfs/citus_node_capacity_1/latest.sql b/src/backend/distributed/sql/udfs/citus_node_capacity_1/latest.sql new file mode 100644 index 000000000..ea21f5143 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_node_capacity_1/latest.sql @@ -0,0 +1,4 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_node_capacity_1(int) + RETURNS float4 AS $$ SELECT 1.0::float4 $$ LANGUAGE sql; +COMMENT ON FUNCTION pg_catalog.citus_node_capacity_1(int) + IS 'a node capacity function for use by the rebalance algorithm that always returns 1'; diff --git a/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/9.2-1.sql b/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/9.2-1.sql new file mode 100644 index 000000000..7f0d2affd --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/9.2-1.sql @@ -0,0 +1,38 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_prepare_pg_upgrade() + RETURNS void + LANGUAGE plpgsql + SET search_path = pg_catalog + AS $cppu$ +BEGIN + -- + -- backup citus catalog tables + -- + CREATE TABLE public.pg_dist_partition AS SELECT * FROM pg_catalog.pg_dist_partition; + CREATE TABLE public.pg_dist_shard AS SELECT * FROM pg_catalog.pg_dist_shard; + CREATE TABLE public.pg_dist_placement AS SELECT * FROM pg_catalog.pg_dist_placement; + CREATE TABLE public.pg_dist_node_metadata AS SELECT * FROM pg_catalog.pg_dist_node_metadata; + CREATE TABLE public.pg_dist_node AS SELECT * FROM pg_catalog.pg_dist_node; + CREATE TABLE public.pg_dist_local_group AS SELECT * FROM pg_catalog.pg_dist_local_group; + CREATE TABLE public.pg_dist_transaction AS SELECT * FROM pg_catalog.pg_dist_transaction; + CREATE TABLE public.pg_dist_colocation AS SELECT * FROM pg_catalog.pg_dist_colocation; + -- enterprise catalog tables + CREATE TABLE public.pg_dist_authinfo AS SELECT * FROM pg_catalog.pg_dist_authinfo; + CREATE TABLE public.pg_dist_poolinfo AS SELECT * FROM pg_catalog.pg_dist_poolinfo; + CREATE TABLE public.pg_dist_rebalance_strategy AS SELECT + name, + default_strategy, + shard_cost_function::regprocedure::text, + node_capacity_function::regprocedure::text, + shard_allowed_on_node_function::regprocedure::text, + default_threshold, + minimum_threshold + FROM pg_catalog.pg_dist_rebalance_strategy; + + -- store upgrade stable identifiers on pg_dist_object catalog + UPDATE citus.pg_dist_object + SET (type, object_names, object_args) = (SELECT * FROM pg_identify_object_as_address(classid, objid, objsubid)); +END; +$cppu$; + +COMMENT ON FUNCTION pg_catalog.citus_prepare_pg_upgrade() + IS 'perform tasks to copy citus settings to a location that could later be restored after pg_upgrade is done'; diff --git a/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/latest.sql b/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/latest.sql index e723cd4bb..7f0d2affd 100644 --- a/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/latest.sql @@ -18,6 +18,15 @@ BEGIN -- enterprise catalog tables CREATE TABLE public.pg_dist_authinfo AS SELECT * FROM pg_catalog.pg_dist_authinfo; CREATE TABLE public.pg_dist_poolinfo AS SELECT * FROM pg_catalog.pg_dist_poolinfo; + CREATE TABLE public.pg_dist_rebalance_strategy AS SELECT + name, + default_strategy, + shard_cost_function::regprocedure::text, + node_capacity_function::regprocedure::text, + shard_allowed_on_node_function::regprocedure::text, + default_threshold, + minimum_threshold + FROM pg_catalog.pg_dist_rebalance_strategy; -- store upgrade stable identifiers on pg_dist_object catalog UPDATE citus.pg_dist_object diff --git a/src/backend/distributed/sql/udfs/citus_set_default_rebalance_strategy/9.2-1.sql b/src/backend/distributed/sql/udfs/citus_set_default_rebalance_strategy/9.2-1.sql new file mode 100644 index 000000000..c930b6f10 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_set_default_rebalance_strategy/9.2-1.sql @@ -0,0 +1,18 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_set_default_rebalance_strategy( + name text +) + RETURNS VOID + STRICT +AS $$ + BEGIN + LOCK TABLE pg_dist_rebalance_strategy IN SHARE ROW EXCLUSIVE MODE; + IF NOT EXISTS (SELECT 1 FROM pg_dist_rebalance_strategy t WHERE t.name = $1) THEN + RAISE EXCEPTION 'strategy with specified name does not exist'; + END IF; + UPDATE pg_dist_rebalance_strategy SET default_strategy = false WHERE default_strategy = true; + UPDATE pg_dist_rebalance_strategy t SET default_strategy = true WHERE t.name = $1; + END; +$$ LANGUAGE plpgsql; + +COMMENT ON FUNCTION pg_catalog.citus_set_default_rebalance_strategy(text) + IS 'changes the default rebalance strategy to the one with the specified name'; diff --git a/src/backend/distributed/sql/udfs/citus_set_default_rebalance_strategy/latest.sql b/src/backend/distributed/sql/udfs/citus_set_default_rebalance_strategy/latest.sql new file mode 100644 index 000000000..c930b6f10 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_set_default_rebalance_strategy/latest.sql @@ -0,0 +1,18 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_set_default_rebalance_strategy( + name text +) + RETURNS VOID + STRICT +AS $$ + BEGIN + LOCK TABLE pg_dist_rebalance_strategy IN SHARE ROW EXCLUSIVE MODE; + IF NOT EXISTS (SELECT 1 FROM pg_dist_rebalance_strategy t WHERE t.name = $1) THEN + RAISE EXCEPTION 'strategy with specified name does not exist'; + END IF; + UPDATE pg_dist_rebalance_strategy SET default_strategy = false WHERE default_strategy = true; + UPDATE pg_dist_rebalance_strategy t SET default_strategy = true WHERE t.name = $1; + END; +$$ LANGUAGE plpgsql; + +COMMENT ON FUNCTION pg_catalog.citus_set_default_rebalance_strategy(text) + IS 'changes the default rebalance strategy to the one with the specified name'; diff --git a/src/backend/distributed/sql/udfs/citus_shard_allowed_on_node_true/9.2-1.sql b/src/backend/distributed/sql/udfs/citus_shard_allowed_on_node_true/9.2-1.sql new file mode 100644 index 000000000..3af68e072 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_shard_allowed_on_node_true/9.2-1.sql @@ -0,0 +1,5 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_shard_allowed_on_node_true(bigint, int) + RETURNS boolean AS $$ SELECT true $$ LANGUAGE sql; +COMMENT ON FUNCTION pg_catalog.citus_shard_allowed_on_node_true(bigint,int) + IS 'a shard_allowed_on_node_function for use by the rebalance algorithm that always returns true'; + diff --git a/src/backend/distributed/sql/udfs/citus_shard_allowed_on_node_true/latest.sql b/src/backend/distributed/sql/udfs/citus_shard_allowed_on_node_true/latest.sql new file mode 100644 index 000000000..3af68e072 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_shard_allowed_on_node_true/latest.sql @@ -0,0 +1,5 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_shard_allowed_on_node_true(bigint, int) + RETURNS boolean AS $$ SELECT true $$ LANGUAGE sql; +COMMENT ON FUNCTION pg_catalog.citus_shard_allowed_on_node_true(bigint,int) + IS 'a shard_allowed_on_node_function for use by the rebalance algorithm that always returns true'; + diff --git a/src/backend/distributed/sql/udfs/citus_shard_cost_1/9.2-1.sql b/src/backend/distributed/sql/udfs/citus_shard_cost_1/9.2-1.sql new file mode 100644 index 000000000..37ce15d64 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_shard_cost_1/9.2-1.sql @@ -0,0 +1,4 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_shard_cost_1(bigint) + RETURNS float4 AS $$ SELECT 1.0::float4 $$ LANGUAGE sql; +COMMENT ON FUNCTION pg_catalog.citus_shard_cost_1(bigint) + IS 'a shard cost function for use by the rebalance algorithm that always returns 1'; diff --git a/src/backend/distributed/sql/udfs/citus_shard_cost_1/latest.sql b/src/backend/distributed/sql/udfs/citus_shard_cost_1/latest.sql new file mode 100644 index 000000000..37ce15d64 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_shard_cost_1/latest.sql @@ -0,0 +1,4 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_shard_cost_1(bigint) + RETURNS float4 AS $$ SELECT 1.0::float4 $$ LANGUAGE sql; +COMMENT ON FUNCTION pg_catalog.citus_shard_cost_1(bigint) + IS 'a shard cost function for use by the rebalance algorithm that always returns 1'; diff --git a/src/backend/distributed/sql/udfs/citus_shard_cost_by_disk_size/9.2-1.sql b/src/backend/distributed/sql/udfs/citus_shard_cost_by_disk_size/9.2-1.sql new file mode 100644 index 000000000..262fb3d13 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_shard_cost_by_disk_size/9.2-1.sql @@ -0,0 +1,6 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_shard_cost_by_disk_size(bigint) + RETURNS float4 + AS 'MODULE_PATHNAME' + LANGUAGE C STRICT VOLATILE; +COMMENT ON FUNCTION pg_catalog.citus_shard_cost_by_disk_size(bigint) + IS 'a shard cost function for use by the rebalance algorithm that returns the disk size in bytes for the specified shard and the shards that are colocated with it'; diff --git a/src/backend/distributed/sql/udfs/citus_shard_cost_by_disk_size/latest.sql b/src/backend/distributed/sql/udfs/citus_shard_cost_by_disk_size/latest.sql new file mode 100644 index 000000000..262fb3d13 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_shard_cost_by_disk_size/latest.sql @@ -0,0 +1,6 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_shard_cost_by_disk_size(bigint) + RETURNS float4 + AS 'MODULE_PATHNAME' + LANGUAGE C STRICT VOLATILE; +COMMENT ON FUNCTION pg_catalog.citus_shard_cost_by_disk_size(bigint) + IS 'a shard cost function for use by the rebalance algorithm that returns the disk size in bytes for the specified shard and the shards that are colocated with it'; diff --git a/src/backend/distributed/sql/udfs/citus_validate_rebalance_strategy_functions/9.2-1.sql b/src/backend/distributed/sql/udfs/citus_validate_rebalance_strategy_functions/9.2-1.sql new file mode 100644 index 000000000..564d80a26 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_validate_rebalance_strategy_functions/9.2-1.sql @@ -0,0 +1,10 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_validate_rebalance_strategy_functions( + shard_cost_function regproc, + node_capacity_function regproc, + shard_allowed_on_node_function regproc +) + RETURNS VOID + AS 'MODULE_PATHNAME' + LANGUAGE C STRICT VOLATILE; +COMMENT ON FUNCTION pg_catalog.citus_validate_rebalance_strategy_functions(regproc,regproc,regproc) + IS 'internal function used by citus to validate signatures of functions used in rebalance strategy'; diff --git a/src/backend/distributed/sql/udfs/citus_validate_rebalance_strategy_functions/latest.sql b/src/backend/distributed/sql/udfs/citus_validate_rebalance_strategy_functions/latest.sql new file mode 100644 index 000000000..564d80a26 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_validate_rebalance_strategy_functions/latest.sql @@ -0,0 +1,10 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_validate_rebalance_strategy_functions( + shard_cost_function regproc, + node_capacity_function regproc, + shard_allowed_on_node_function regproc +) + RETURNS VOID + AS 'MODULE_PATHNAME' + LANGUAGE C STRICT VOLATILE; +COMMENT ON FUNCTION pg_catalog.citus_validate_rebalance_strategy_functions(regproc,regproc,regproc) + IS 'internal function used by citus to validate signatures of functions used in rebalance strategy'; diff --git a/src/backend/distributed/sql/udfs/get_rebalance_table_shards_plan/9.2-1.sql b/src/backend/distributed/sql/udfs/get_rebalance_table_shards_plan/9.2-1.sql new file mode 100644 index 000000000..7970a61dd --- /dev/null +++ b/src/backend/distributed/sql/udfs/get_rebalance_table_shards_plan/9.2-1.sql @@ -0,0 +1,26 @@ +-- get_rebalance_table_shards_plan shows the actual events that will be performed +-- if a rebalance operation will be performed with the same arguments, which allows users +-- to understand the impact of the change overall availability of the application and +-- network trafic. +-- +DROP FUNCTION pg_catalog.get_rebalance_table_shards_plan; +CREATE OR REPLACE FUNCTION pg_catalog.get_rebalance_table_shards_plan( + relation regclass default NULL, + threshold float4 default NULL, + max_shard_moves int default 1000000, + excluded_shard_list bigint[] default '{}', + drain_only boolean default false, + rebalance_strategy name default NULL + ) + RETURNS TABLE (table_name regclass, + shardid bigint, + shard_size bigint, + sourcename text, + sourceport int, + targetname text, + targetport int) + AS 'MODULE_PATHNAME' + LANGUAGE C VOLATILE; +COMMENT ON FUNCTION pg_catalog.get_rebalance_table_shards_plan(regclass, float4, int, bigint[], boolean, name) + IS 'returns the list of shard placement moves to be done on a rebalance operation'; + diff --git a/src/backend/distributed/sql/udfs/get_rebalance_table_shards_plan/latest.sql b/src/backend/distributed/sql/udfs/get_rebalance_table_shards_plan/latest.sql index 68fb854e5..7970a61dd 100644 --- a/src/backend/distributed/sql/udfs/get_rebalance_table_shards_plan/latest.sql +++ b/src/backend/distributed/sql/udfs/get_rebalance_table_shards_plan/latest.sql @@ -6,10 +6,12 @@ DROP FUNCTION pg_catalog.get_rebalance_table_shards_plan; CREATE OR REPLACE FUNCTION pg_catalog.get_rebalance_table_shards_plan( relation regclass default NULL, - threshold float4 default 0, + threshold float4 default NULL, max_shard_moves int default 1000000, excluded_shard_list bigint[] default '{}', - drain_only boolean default false) + drain_only boolean default false, + rebalance_strategy name default NULL + ) RETURNS TABLE (table_name regclass, shardid bigint, shard_size bigint, @@ -19,5 +21,6 @@ CREATE OR REPLACE FUNCTION pg_catalog.get_rebalance_table_shards_plan( targetport int) AS 'MODULE_PATHNAME' LANGUAGE C VOLATILE; -COMMENT ON FUNCTION pg_catalog.get_rebalance_table_shards_plan(regclass, float4, int, bigint[], boolean) +COMMENT ON FUNCTION pg_catalog.get_rebalance_table_shards_plan(regclass, float4, int, bigint[], boolean, name) IS 'returns the list of shard placement moves to be done on a rebalance operation'; + diff --git a/src/backend/distributed/sql/udfs/master_drain_node/9.2-1.sql b/src/backend/distributed/sql/udfs/master_drain_node/9.2-1.sql new file mode 100644 index 000000000..40312d03d --- /dev/null +++ b/src/backend/distributed/sql/udfs/master_drain_node/9.2-1.sql @@ -0,0 +1,14 @@ +DROP FUNCTION pg_catalog.master_drain_node; +CREATE FUNCTION pg_catalog.master_drain_node( + nodename text, + nodeport integer, + shard_transfer_mode citus.shard_transfer_mode default 'auto', + rebalance_strategy name default NULL + ) + RETURNS VOID + LANGUAGE C + AS 'MODULE_PATHNAME', $$master_drain_node$$; +COMMENT ON FUNCTION pg_catalog.master_drain_node(text,int,citus.shard_transfer_mode,name) + IS 'mark a node to be drained of data and actually drain it as well'; + +REVOKE ALL ON FUNCTION pg_catalog.master_drain_node(text,int,citus.shard_transfer_mode,name) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/master_drain_node/latest.sql b/src/backend/distributed/sql/udfs/master_drain_node/latest.sql index 02b536f9a..40312d03d 100644 --- a/src/backend/distributed/sql/udfs/master_drain_node/latest.sql +++ b/src/backend/distributed/sql/udfs/master_drain_node/latest.sql @@ -1,11 +1,14 @@ +DROP FUNCTION pg_catalog.master_drain_node; CREATE FUNCTION pg_catalog.master_drain_node( nodename text, nodeport integer, - shard_transfer_mode citus.shard_transfer_mode default 'auto') + shard_transfer_mode citus.shard_transfer_mode default 'auto', + rebalance_strategy name default NULL + ) RETURNS VOID - LANGUAGE C STRICT + LANGUAGE C AS 'MODULE_PATHNAME', $$master_drain_node$$; -COMMENT ON FUNCTION pg_catalog.master_drain_node(text,int,citus.shard_transfer_mode) +COMMENT ON FUNCTION pg_catalog.master_drain_node(text,int,citus.shard_transfer_mode,name) IS 'mark a node to be drained of data and actually drain it as well'; -REVOKE ALL ON FUNCTION pg_catalog.master_drain_node(text,int,citus.shard_transfer_mode) FROM PUBLIC; +REVOKE ALL ON FUNCTION pg_catalog.master_drain_node(text,int,citus.shard_transfer_mode,name) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/pg_dist_rebalance_strategy_trigger_func/9.2-1.sql b/src/backend/distributed/sql/udfs/pg_dist_rebalance_strategy_trigger_func/9.2-1.sql new file mode 100644 index 000000000..8e814c5bc --- /dev/null +++ b/src/backend/distributed/sql/udfs/pg_dist_rebalance_strategy_trigger_func/9.2-1.sql @@ -0,0 +1,28 @@ +-- Ensures that only a single default strategy is possible +CREATE OR REPLACE FUNCTION citus_internal.pg_dist_rebalance_strategy_trigger_func() +RETURNS TRIGGER AS $$ + BEGIN + -- citus_add_rebalance_strategy also takes out a ShareRowExclusiveLock + LOCK TABLE pg_dist_rebalance_strategy IN SHARE ROW EXCLUSIVE MODE; + + PERFORM citus_validate_rebalance_strategy_functions( + NEW.shard_cost_function, + NEW.node_capacity_function, + NEW.shard_allowed_on_node_function); + + IF NEW.default_threshold < NEW.minimum_threshold THEN + RAISE EXCEPTION 'default_threshold cannot be smaller than minimum_threshold'; + END IF; + + IF NOT NEW.default_strategy THEN + RETURN NEW; + END IF; + IF TG_OP = 'UPDATE' AND NEW.default_strategy = OLD.default_strategy THEN + return NEW; + END IF; + IF EXISTS (SELECT 1 FROM pg_dist_rebalance_strategy WHERE default_strategy) THEN + RAISE EXCEPTION 'there cannot be two default strategies'; + END IF; + RETURN NEW; + END; +$$ LANGUAGE plpgsql; diff --git a/src/backend/distributed/sql/udfs/pg_dist_rebalance_strategy_trigger_func/latest.sql b/src/backend/distributed/sql/udfs/pg_dist_rebalance_strategy_trigger_func/latest.sql new file mode 100644 index 000000000..8e814c5bc --- /dev/null +++ b/src/backend/distributed/sql/udfs/pg_dist_rebalance_strategy_trigger_func/latest.sql @@ -0,0 +1,28 @@ +-- Ensures that only a single default strategy is possible +CREATE OR REPLACE FUNCTION citus_internal.pg_dist_rebalance_strategy_trigger_func() +RETURNS TRIGGER AS $$ + BEGIN + -- citus_add_rebalance_strategy also takes out a ShareRowExclusiveLock + LOCK TABLE pg_dist_rebalance_strategy IN SHARE ROW EXCLUSIVE MODE; + + PERFORM citus_validate_rebalance_strategy_functions( + NEW.shard_cost_function, + NEW.node_capacity_function, + NEW.shard_allowed_on_node_function); + + IF NEW.default_threshold < NEW.minimum_threshold THEN + RAISE EXCEPTION 'default_threshold cannot be smaller than minimum_threshold'; + END IF; + + IF NOT NEW.default_strategy THEN + RETURN NEW; + END IF; + IF TG_OP = 'UPDATE' AND NEW.default_strategy = OLD.default_strategy THEN + return NEW; + END IF; + IF EXISTS (SELECT 1 FROM pg_dist_rebalance_strategy WHERE default_strategy) THEN + RAISE EXCEPTION 'there cannot be two default strategies'; + END IF; + RETURN NEW; + END; +$$ LANGUAGE plpgsql; diff --git a/src/backend/distributed/sql/udfs/rebalance_table_shards/9.2-1.sql b/src/backend/distributed/sql/udfs/rebalance_table_shards/9.2-1.sql new file mode 100644 index 000000000..52d4128d0 --- /dev/null +++ b/src/backend/distributed/sql/udfs/rebalance_table_shards/9.2-1.sql @@ -0,0 +1,18 @@ +-- rebalance_table_shards uses the shard rebalancer's C UDF functions to rebalance +-- shards of the given relation. +-- +DROP FUNCTION pg_catalog.rebalance_table_shards; +CREATE OR REPLACE FUNCTION pg_catalog.rebalance_table_shards( + relation regclass default NULL, + threshold float4 default NULL, + max_shard_moves int default 1000000, + excluded_shard_list bigint[] default '{}', + shard_transfer_mode citus.shard_transfer_mode default 'auto', + drain_only boolean default false, + rebalance_strategy name default NULL + ) + RETURNS VOID + AS 'MODULE_PATHNAME' + LANGUAGE C VOLATILE; +COMMENT ON FUNCTION pg_catalog.rebalance_table_shards(regclass, float4, int, bigint[], citus.shard_transfer_mode, boolean, name) + IS 'rebalance the shards of the given table across the worker nodes (including colocated shards of other tables)'; diff --git a/src/backend/distributed/sql/udfs/rebalance_table_shards/latest.sql b/src/backend/distributed/sql/udfs/rebalance_table_shards/latest.sql index 434d36ad1..52d4128d0 100644 --- a/src/backend/distributed/sql/udfs/rebalance_table_shards/latest.sql +++ b/src/backend/distributed/sql/udfs/rebalance_table_shards/latest.sql @@ -4,13 +4,15 @@ DROP FUNCTION pg_catalog.rebalance_table_shards; CREATE OR REPLACE FUNCTION pg_catalog.rebalance_table_shards( relation regclass default NULL, - threshold float4 default 0, + threshold float4 default NULL, max_shard_moves int default 1000000, excluded_shard_list bigint[] default '{}', shard_transfer_mode citus.shard_transfer_mode default 'auto', - drain_only boolean default false) + drain_only boolean default false, + rebalance_strategy name default NULL + ) RETURNS VOID AS 'MODULE_PATHNAME' LANGUAGE C VOLATILE; -COMMENT ON FUNCTION pg_catalog.rebalance_table_shards(regclass, float4, int, bigint[], citus.shard_transfer_mode, boolean) +COMMENT ON FUNCTION pg_catalog.rebalance_table_shards(regclass, float4, int, bigint[], citus.shard_transfer_mode, boolean, name) IS 'rebalance the shards of the given table across the worker nodes (including colocated shards of other tables)'; diff --git a/src/include/distributed/argutils.h b/src/include/distributed/argutils.h new file mode 100644 index 000000000..38efbdc77 --- /dev/null +++ b/src/include/distributed/argutils.h @@ -0,0 +1,53 @@ +/*------------------------------------------------------------------------- + * + * argutils.h + * + * Macros to help with argument parsing in UDFs. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +/* + * PG_ENSURE_ARGNOTNULL ensures that a UDF argument is not NULL and throws an + * error otherwise. This is useful for non STRICT UDFs where only some + * arguments are allowed to be NULL. + */ +#define PG_ENSURE_ARGNOTNULL(argIndex, argName) \ + if (PG_ARGISNULL(argIndex)) \ + { \ + ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), \ + errmsg("%s cannot be NULL", argName))); \ + } + +/* + * PG_GETARG_TEXT_TO_CSTRING is the same as PG_GETARG_TEXT_P, but instead of + * text* it returns char*. Just like most other PG_GETARG_* macros this assumes + * the argument is not NULL. + */ +#define PG_GETARG_TEXT_TO_CSTRING(argIndex) \ + text_to_cstring(PG_GETARG_TEXT_P(argIndex)) + +/* + * PG_GETARG_TEXT_TO_CSTRING_OR_NULL is the same as PG_GETARG_TEXT_TO_CSTRING, + * but it supports the case where the argument is NULL. In this case it will + * return a NULL pointer. + */ +#define PG_GETARG_TEXT_TO_CSTRING_OR_NULL(argIndex) \ + PG_ARGISNULL(argIndex) ? NULL : PG_GETARG_TEXT_TO_CSTRING(argIndex) + +/* + * PG_GETARG_NAME_OR_NULL is the same as PG_GETARG_NAME, but it supports the + * case where the argument is NULL. In this case it will return a NULL pointer. + */ +#define PG_GETARG_NAME_OR_NULL(argIndex) \ + PG_ARGISNULL(argIndex) ? NULL : PG_GETARG_NAME(argIndex) + +/* + * PG_GETARG_FLOAT4_OR is the same as PG_GETARG_FLOAT4, but it supports the + * case where the argument is NULL. In that case it will return the provided + * fallback. + */ +#define PG_GETARG_FLOAT4_OR_DEFAULT(argIndex, fallback) \ + PG_ARGISNULL(argIndex) ? (fallback) : PG_GETARG_FLOAT4(argIndex) diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 4fc7afd5c..789ccd142 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -112,6 +112,8 @@ extern ShardPlacement * FinalizedShardPlacement(uint64 shardId, bool missingOk); extern List * BuildShardPlacementList(ShardInterval *shardInterval); extern List * AllShardPlacementsOnNodeGroup(int32 groupId); extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId); +extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, + char *sizeQuery); /* Function declarations to modify shard and shard placement data */ extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index e3d9a5085..0af8434e6 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -165,6 +165,7 @@ extern Oid DistPartitionRelationId(void); extern Oid DistShardRelationId(void); extern Oid DistPlacementRelationId(void); extern Oid DistNodeRelationId(void); +extern Oid DistRebalanceStrategyRelationId(void); extern Oid DistLocalGroupIdRelationId(void); extern Oid DistObjectRelationId(void); extern Oid DistEnabledCustomAggregatesId(void); diff --git a/src/include/distributed/pg_dist_rebalance_strategy.h b/src/include/distributed/pg_dist_rebalance_strategy.h new file mode 100644 index 000000000..148c772cc --- /dev/null +++ b/src/include/distributed/pg_dist_rebalance_strategy.h @@ -0,0 +1,54 @@ +/*------------------------------------------------------------------------- + * + * pg_dist_rebalance_strategy.h + * definition of the "rebalance strategy" relation (pg_dist_rebalance_strategy). + * + * This table contains all the available strategies for rebalancing. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef PG_DIST_REBALANCE_STRATEGY_H +#define PG_DIST_REBALANCE_STRATEGY_H + +#include "postgres.h" + + +/* ---------------- + * pg_dist_shard definition. + * ---------------- + */ +typedef struct FormData_pg_dist_rebalance_strategy +{ + NameData name; /* user readable name of the strategy */ + bool default_strategy; /* if this strategy is the default strategy */ + Oid shardCostFunction; /* function to calculate the shard cost */ + Oid nodeCapacityFunction; /* function to get the capacity of a node */ + Oid shardAllowedOnNodeFunction; /* function to check if shard is allowed on node */ + float4 defaultThreshold; /* default threshold that is used */ + float4 minimumThreshold; /* minimum threshold that is allowed */ +} FormData_pg_dist_rebalance_strategy; + +/* ---------------- + * Form_pg_dist_shards corresponds to a pointer to a tuple with + * the format of pg_dist_shards relation. + * ---------------- + */ +typedef FormData_pg_dist_rebalance_strategy *Form_pg_dist_rebalance_strategy; + +/* ---------------- + * compiler constants for pg_dist_rebalance_strategy + * ---------------- + */ +#define Natts_pg_dist_rebalance_strategy 7 +#define Anum_pg_dist_rebalance_strategy_name 1 +#define Anum_pg_dist_rebalance_strategy_default_strategy 2 +#define Anum_pg_dist_rebalance_strategy_shard_cost_function 3 +#define Anum_pg_dist_rebalance_strategy_node_capacity_function 4 +#define Anum_pg_dist_rebalance_strategy_shard_allowed_on_node_function 5 +#define Anum_pg_dist_rebalance_strategy_default_threshold 6 +#define Anum_pg_dist_rebalance_strategy_minimum_threshold 7 + +#endif /* PG_DIST_REBALANCE_STRATEGY_H */ diff --git a/src/test/regress/after_pg_upgrade_schedule b/src/test/regress/after_pg_upgrade_schedule index e06ae46a4..243325c51 100644 --- a/src/test/regress/after_pg_upgrade_schedule +++ b/src/test/regress/after_pg_upgrade_schedule @@ -1 +1 @@ -test: upgrade_basic_after upgrade_type_after upgrade_ref2ref_after upgrade_distributed_function_after +test: upgrade_basic_after upgrade_type_after upgrade_ref2ref_after upgrade_distributed_function_after upgrade_rebalance_strategy_after diff --git a/src/test/regress/before_pg_upgrade_schedule b/src/test/regress/before_pg_upgrade_schedule index dd966dccd..a4590c073 100644 --- a/src/test/regress/before_pg_upgrade_schedule +++ b/src/test/regress/before_pg_upgrade_schedule @@ -2,4 +2,4 @@ test: multi_test_helpers test: multi_test_catalog_views test: upgrade_basic_before -test: upgrade_type_before upgrade_ref2ref_before upgrade_distributed_function_before +test: upgrade_type_before upgrade_ref2ref_before upgrade_distributed_function_before upgrade_rebalance_strategy_before diff --git a/src/test/regress/expected/multi_utility_warnings.out b/src/test/regress/expected/multi_utility_warnings.out index 76d5a8325..6a417ef96 100644 --- a/src/test/regress/expected/multi_utility_warnings.out +++ b/src/test/regress/expected/multi_utility_warnings.out @@ -25,3 +25,7 @@ ERROR: cannot write to pg_dist_poolinfo DETAIL: Citus Community Edition does not support the use of pooler options. HINT: To learn more about using advanced pooling schemes with Citus, please contact us at https://citusdata.com/about/contact_us ROLLBACK; +INSERT INTO pg_dist_rebalance_strategy VALUES ('should fail', false, 'citus_shard_cost_1', 'citus_node_capacity_1', 'citus_shard_allowed_on_node_true', 0, 0); +ERROR: cannot write to pg_dist_rebalance_strategy +DETAIL: Citus Community Edition does not support the use of custom rebalance strategies. +HINT: To learn more about using advanced rebalancing schemes with Citus, please contact us at https://citusdata.com/about/contact_us diff --git a/src/test/regress/expected/upgrade_rebalance_strategy_after.out b/src/test/regress/expected/upgrade_rebalance_strategy_after.out new file mode 100644 index 000000000..4bfe9ed4c --- /dev/null +++ b/src/test/regress/expected/upgrade_rebalance_strategy_after.out @@ -0,0 +1,8 @@ +SELECT * FROM pg_catalog.pg_dist_rebalance_strategy ORDER BY name; + name | default_strategy | shard_cost_function | node_capacity_function | shard_allowed_on_node_function | default_threshold | minimum_threshold +-----------------+------------------+-----------------------------------------+---------------------------------------------------+------------------------------------------+-------------------+------------------- + by_disk_size | f | citus_shard_cost_by_disk_size | citus_node_capacity_1 | citus_shard_allowed_on_node_true | 0.1 | 0.01 + by_shard_count | f | citus_shard_cost_1 | citus_node_capacity_1 | citus_shard_allowed_on_node_true | 0 | 0 + custom_strategy | t | upgrade_rebalance_strategy.shard_cost_2 | upgrade_rebalance_strategy.capacity_high_worker_1 | upgrade_rebalance_strategy.only_worker_2 | 0.5 | 0.2 +(3 rows) + diff --git a/src/test/regress/expected/upgrade_rebalance_strategy_before.out b/src/test/regress/expected/upgrade_rebalance_strategy_before.out new file mode 100644 index 000000000..327d05ec2 --- /dev/null +++ b/src/test/regress/expected/upgrade_rebalance_strategy_before.out @@ -0,0 +1,38 @@ +CREATE SCHEMA upgrade_rebalance_strategy; +SET search_path TO upgrade_rebalance_strategy, public; +-- The following function signatures should always keep working +CREATE FUNCTION shard_cost_2(bigint) + RETURNS float4 AS $$ SELECT 2.0::float4 $$ LANGUAGE sql; +CREATE FUNCTION capacity_high_worker_1(nodeidarg int) + RETURNS real AS $$ + SELECT + (CASE WHEN nodeport = 57637 THEN 1000 ELSE 1 END)::real + FROM pg_dist_node where nodeid = nodeidarg + $$ LANGUAGE sql; +CREATE FUNCTION only_worker_2(shardid bigint, nodeidarg int) + RETURNS boolean AS $$ + SELECT + (CASE WHEN nodeport = 57638 THEN TRUE ELSE FALSE END) + FROM pg_dist_node where nodeid = nodeidarg + $$ LANGUAGE sql; +ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DISABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger; +SELECT citus_add_rebalance_strategy( + 'custom_strategy', + 'shard_cost_2', + 'capacity_high_worker_1', + 'only_worker_2', + 0.5, + 0.2 + ); + citus_add_rebalance_strategy +------------------------------ + +(1 row) + +SELECT citus_set_default_rebalance_strategy('custom_strategy'); + citus_set_default_rebalance_strategy +-------------------------------------- + +(1 row) + +ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger; diff --git a/src/test/regress/sql/multi_utility_warnings.sql b/src/test/regress/sql/multi_utility_warnings.sql index 3a7b0a910..296e4f3c2 100644 --- a/src/test/regress/sql/multi_utility_warnings.sql +++ b/src/test/regress/sql/multi_utility_warnings.sql @@ -21,3 +21,4 @@ BEGIN; INSERT INTO pg_dist_node VALUES (1234567890, 1234567890, 'localhost', 5432); INSERT INTO pg_dist_poolinfo VALUES (1234567890, 'port=1234'); ROLLBACK; +INSERT INTO pg_dist_rebalance_strategy VALUES ('should fail', false, 'citus_shard_cost_1', 'citus_node_capacity_1', 'citus_shard_allowed_on_node_true', 0, 0); diff --git a/src/test/regress/sql/upgrade_rebalance_strategy_after.sql b/src/test/regress/sql/upgrade_rebalance_strategy_after.sql new file mode 100644 index 000000000..f1345a9a8 --- /dev/null +++ b/src/test/regress/sql/upgrade_rebalance_strategy_after.sql @@ -0,0 +1 @@ +SELECT * FROM pg_catalog.pg_dist_rebalance_strategy ORDER BY name; diff --git a/src/test/regress/sql/upgrade_rebalance_strategy_before.sql b/src/test/regress/sql/upgrade_rebalance_strategy_before.sql new file mode 100644 index 000000000..b76aa4ccd --- /dev/null +++ b/src/test/regress/sql/upgrade_rebalance_strategy_before.sql @@ -0,0 +1,32 @@ +CREATE SCHEMA upgrade_rebalance_strategy; +SET search_path TO upgrade_rebalance_strategy, public; + +-- The following function signatures should always keep working +CREATE FUNCTION shard_cost_2(bigint) + RETURNS float4 AS $$ SELECT 2.0::float4 $$ LANGUAGE sql; + +CREATE FUNCTION capacity_high_worker_1(nodeidarg int) + RETURNS real AS $$ + SELECT + (CASE WHEN nodeport = 57637 THEN 1000 ELSE 1 END)::real + FROM pg_dist_node where nodeid = nodeidarg + $$ LANGUAGE sql; + +CREATE FUNCTION only_worker_2(shardid bigint, nodeidarg int) + RETURNS boolean AS $$ + SELECT + (CASE WHEN nodeport = 57638 THEN TRUE ELSE FALSE END) + FROM pg_dist_node where nodeid = nodeidarg + $$ LANGUAGE sql; +ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DISABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger; + +SELECT citus_add_rebalance_strategy( + 'custom_strategy', + 'shard_cost_2', + 'capacity_high_worker_1', + 'only_worker_2', + 0.5, + 0.2 + ); +SELECT citus_set_default_rebalance_strategy('custom_strategy'); +ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger;