From 5c9ca154f95636e798d3e12582feeab9e3f36aaf Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Tue, 17 Jan 2023 15:23:08 +0300 Subject: [PATCH] v1 --- dist_key.sql | 72 +++++++++++++++++++ .../distributed/commands/pick_dist_key.c | 42 +++++++++++ 2 files changed, 114 insertions(+) create mode 100644 dist_key.sql create mode 100644 src/backend/distributed/commands/pick_dist_key.c diff --git a/dist_key.sql b/dist_key.sql new file mode 100644 index 000000000..086502de8 --- /dev/null +++ b/dist_key.sql @@ -0,0 +1,72 @@ +-- CREATE VIEW citus_shard_key_candidates AS +WITH fkeys AS ( + select distinct on (freloid, confkey) + c.oid as reloid, cf.oid as freloid, + (select string_agg(attname, ',') + from pg_attribute a + where a.attrelid = r.conrelid + and array[a.attnum::integer] <@ conkey::integer[] + ) as conkey, + (select string_agg(attname, ',') + from pg_attribute a + where a.attrelid = r.confrelid + and array[a.attnum::integer] <@ confkey::integer[] + ) as confkey + from pg_catalog.pg_constraint r + JOIN pg_class c on r.conrelid = c.oid + JOIN pg_namespace n on c.relnamespace = n.oid + JOIN pg_class cf on r.confrelid = cf.oid + JOIN pg_namespace nf on cf.relnamespace = nf.oid + JOIN pg_depend d on d.classid = 'pg_constraint'::regclass + and d.objid = r.oid + and d.refobjsubid = 0 + where r.contype = 'f' + AND c.relkind in ('r', 'f', 'p') + AND cf.relkind in ('r', 'f', 'p') + AND n.nspname !~ '^pg_' and n.nspname <> 'information_schema' + AND nf.nspname !~ '^pg_' and nf.nspname <> 'information_schema' + AND c.relkind = 'r' and c.relpersistence = 'p' + AND cf.relkind = 'r' and cf.relpersistence = 'p' +) +select n.nspname as schema_name, c.relname as table_name, + CASE WHEN pkeys.attname IS NULL THEN 'reference' ELSE 'distributed' END AS citus_table_type, + pkeys.attname as distribution_key, + CASE + WHEN pkeys.attname IS NOT NULL + THEN json_agg(json_object(array['table', 'column'], array[fkeys_d.reloid::regclass::text, fkeys_d.conkey])) + WHEN COUNT(fkeys_r.reloid) > 0 + THEN json_agg(json_object(array['table', 'column'], array[fkeys_r.freloid::regclass::text, fkeys_r.confkey])) + END as related_columns + from pg_catalog.pg_class c + join pg_catalog.pg_namespace n on c.relnamespace = n.oid + join pg_roles auth ON auth.oid = c.relowner + left join lateral ( + select indrelid, indexrelid, a.attname + from pg_index x + join pg_class i on i.oid = x.indexrelid + join pg_attribute a on a.attrelid = c.oid and attnum = indkey[0] + where x.indrelid = c.oid + and (indisprimary or indisunique) + and array_length(indkey::integer[], 1) = 1 + and atttypid in ('smallint'::regtype, + 'int'::regtype, + 'bigint'::regtype) + order by not indisprimary, not indisunique + limit 1 + ) as pkeys on true + left join fkeys AS fkeys_d ON (fkeys_d.freloid = c.oid) + left join fkeys AS fkeys_r ON (fkeys_r.reloid = c.oid) + where relkind = 'r' and c.relpersistence = 'p' + and n.nspname !~ '^pg_' and n.nspname <> 'information_schema' + and not exists + ( + select 1 + from pg_depend d + where d.classid = 'pg_class'::regclass + and d.objid = c.oid + and d.deptype = 'e' + ) + and not exists (select 1 from pg_dist_partition where logicalrelid = c.oid) +group by 1,2,3,4 +order by citus_table_type, n.nspname, c.relname; + diff --git a/src/backend/distributed/commands/pick_dist_key.c b/src/backend/distributed/commands/pick_dist_key.c new file mode 100644 index 000000000..7c3d6b90b --- /dev/null +++ b/src/backend/distributed/commands/pick_dist_key.c @@ -0,0 +1,42 @@ +#include "postgres.h" +#include "fmgr.h" +#include "storage/lockdefs.h" +#include "utils/relcache.h" + +PG_FUNCTION_INFO_V1(pick_dist_key); + +Datum +pick_dist_key(PG_FUNCTION_ARGS) +{ + if (PG_ARGISNULL(0)) + { + ereport(ERROR, (errmsg("table name cannot be null"))); + } + + Oid relationId = PG_GETARG_OID(0); + + Relation relation = try_relation_open(relationId, AccessShareLock); + if (relation == NULL) + { + ereport(ERROR, (errmsg("no such table exists"))); + } + + relation_close(relation, NoLock); + + char *distributionColumnName = text_to_cstring(distributionColumnText); + Assert(distributionColumnName != NULL); + + char distributionMethod = LookupDistributionMethod(distributionMethodOid); + + if (shardCount < 1 || shardCount > MAX_SHARD_COUNT) + { + ereport(ERROR, (errmsg("%d is outside the valid range for " + "parameter \"shard_count\" (1 .. %d)", + shardCount, MAX_SHARD_COUNT))); + } + + CreateDistributedTable(relationId, distributionColumnName, distributionMethod, + shardCount, shardCountIsStrict, colocateWithTableName); + + PG_RETURN_VOID(); +}