mirror of https://github.com/citusdata/citus.git
v1 cont
parent
73a81e923b
commit
b5e3ea69c8
72
dist_key.sql
72
dist_key.sql
|
@ -1,72 +0,0 @@
|
||||||
-- CREATE VIEW citus_shard_key_candidates AS
|
|
||||||
WITH fkeys AS (
|
|
||||||
select distinct on (freloid, confkey)
|
|
||||||
c.oid as reloid, cf.oid as freloid,
|
|
||||||
(select string_agg(attname, ',')
|
|
||||||
from pg_attribute a
|
|
||||||
where a.attrelid = r.conrelid
|
|
||||||
and array[a.attnum::integer] <@ conkey::integer[]
|
|
||||||
) as conkey,
|
|
||||||
(select string_agg(attname, ',')
|
|
||||||
from pg_attribute a
|
|
||||||
where a.attrelid = r.confrelid
|
|
||||||
and array[a.attnum::integer] <@ confkey::integer[]
|
|
||||||
) as confkey
|
|
||||||
from pg_catalog.pg_constraint r
|
|
||||||
JOIN pg_class c on r.conrelid = c.oid
|
|
||||||
JOIN pg_namespace n on c.relnamespace = n.oid
|
|
||||||
JOIN pg_class cf on r.confrelid = cf.oid
|
|
||||||
JOIN pg_namespace nf on cf.relnamespace = nf.oid
|
|
||||||
JOIN pg_depend d on d.classid = 'pg_constraint'::regclass
|
|
||||||
and d.objid = r.oid
|
|
||||||
and d.refobjsubid = 0
|
|
||||||
where r.contype = 'f'
|
|
||||||
AND c.relkind in ('r', 'f', 'p')
|
|
||||||
AND cf.relkind in ('r', 'f', 'p')
|
|
||||||
AND n.nspname !~ '^pg_' and n.nspname <> 'information_schema'
|
|
||||||
AND nf.nspname !~ '^pg_' and nf.nspname <> 'information_schema'
|
|
||||||
AND c.relkind = 'r' and c.relpersistence = 'p'
|
|
||||||
AND cf.relkind = 'r' and cf.relpersistence = 'p'
|
|
||||||
)
|
|
||||||
select n.nspname as schema_name, c.relname as table_name,
|
|
||||||
CASE WHEN pkeys.attname IS NULL THEN 'reference' ELSE 'distributed' END AS citus_table_type,
|
|
||||||
pkeys.attname as distribution_key,
|
|
||||||
CASE
|
|
||||||
WHEN pkeys.attname IS NOT NULL
|
|
||||||
THEN json_agg(json_object(array['table', 'column'], array[fkeys_d.reloid::regclass::text, fkeys_d.conkey]))
|
|
||||||
WHEN COUNT(fkeys_r.reloid) > 0
|
|
||||||
THEN json_agg(json_object(array['table', 'column'], array[fkeys_r.freloid::regclass::text, fkeys_r.confkey]))
|
|
||||||
END as related_columns
|
|
||||||
from pg_catalog.pg_class c
|
|
||||||
join pg_catalog.pg_namespace n on c.relnamespace = n.oid
|
|
||||||
join pg_roles auth ON auth.oid = c.relowner
|
|
||||||
left join lateral (
|
|
||||||
select indrelid, indexrelid, a.attname
|
|
||||||
from pg_index x
|
|
||||||
join pg_class i on i.oid = x.indexrelid
|
|
||||||
join pg_attribute a on a.attrelid = c.oid and attnum = indkey[0]
|
|
||||||
where x.indrelid = c.oid
|
|
||||||
and (indisprimary or indisunique)
|
|
||||||
and array_length(indkey::integer[], 1) = 1
|
|
||||||
and atttypid in ('smallint'::regtype,
|
|
||||||
'int'::regtype,
|
|
||||||
'bigint'::regtype)
|
|
||||||
order by not indisprimary, not indisunique
|
|
||||||
limit 1
|
|
||||||
) as pkeys on true
|
|
||||||
left join fkeys AS fkeys_d ON (fkeys_d.freloid = c.oid)
|
|
||||||
left join fkeys AS fkeys_r ON (fkeys_r.reloid = c.oid)
|
|
||||||
where relkind = 'r' and c.relpersistence = 'p'
|
|
||||||
and n.nspname !~ '^pg_' and n.nspname <> 'information_schema'
|
|
||||||
and not exists
|
|
||||||
(
|
|
||||||
select 1
|
|
||||||
from pg_depend d
|
|
||||||
where d.classid = 'pg_class'::regclass
|
|
||||||
and d.objid = c.oid
|
|
||||||
and d.deptype = 'e'
|
|
||||||
)
|
|
||||||
and not exists (select 1 from pg_dist_partition where logicalrelid = c.oid)
|
|
||||||
group by 1,2,3,4
|
|
||||||
order by citus_table_type, n.nspname, c.relname;
|
|
||||||
|
|
|
@ -1,42 +0,0 @@
|
||||||
#include "postgres.h"
|
|
||||||
#include "fmgr.h"
|
|
||||||
#include "storage/lockdefs.h"
|
|
||||||
#include "utils/relcache.h"
|
|
||||||
|
|
||||||
PG_FUNCTION_INFO_V1(pick_dist_key);
|
|
||||||
|
|
||||||
Datum
|
|
||||||
pick_dist_key(PG_FUNCTION_ARGS)
|
|
||||||
{
|
|
||||||
if (PG_ARGISNULL(0))
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("table name cannot be null")));
|
|
||||||
}
|
|
||||||
|
|
||||||
Oid relationId = PG_GETARG_OID(0);
|
|
||||||
|
|
||||||
Relation relation = try_relation_open(relationId, AccessShareLock);
|
|
||||||
if (relation == NULL)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("no such table exists")));
|
|
||||||
}
|
|
||||||
|
|
||||||
relation_close(relation, NoLock);
|
|
||||||
|
|
||||||
char *distributionColumnName = text_to_cstring(distributionColumnText);
|
|
||||||
Assert(distributionColumnName != NULL);
|
|
||||||
|
|
||||||
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
|
|
||||||
|
|
||||||
if (shardCount < 1 || shardCount > MAX_SHARD_COUNT)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("%d is outside the valid range for "
|
|
||||||
"parameter \"shard_count\" (1 .. %d)",
|
|
||||||
shardCount, MAX_SHARD_COUNT)));
|
|
||||||
}
|
|
||||||
|
|
||||||
CreateDistributedTable(relationId, distributionColumnName, distributionMethod,
|
|
||||||
shardCount, shardCountIsStrict, colocateWithTableName);
|
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
|
||||||
}
|
|
|
@ -27,3 +27,142 @@ INSERT INTO pg_dist_cleanup
|
||||||
WHERE plc.shardstate = 4;
|
WHERE plc.shardstate = 4;
|
||||||
|
|
||||||
DELETE FROM pg_dist_placement WHERE shardstate = 4;
|
DELETE FROM pg_dist_placement WHERE shardstate = 4;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Use the following DDLs for testing:
|
||||||
|
|
||||||
|
CREATE TABLE companies (
|
||||||
|
id bigserial PRIMARY KEY,
|
||||||
|
name text NOT NULL,
|
||||||
|
image_url text,
|
||||||
|
created_at timestamp without time zone NOT NULL,
|
||||||
|
updated_at timestamp without time zone NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE campaigns (
|
||||||
|
id bigserial, -- was: PRIMARY KEY
|
||||||
|
company_id bigint REFERENCES companies (id),
|
||||||
|
name text NOT NULL,
|
||||||
|
cost_model text NOT NULL,
|
||||||
|
state text NOT NULL,
|
||||||
|
monthly_budget bigint,
|
||||||
|
blacklisted_site_urls text[],
|
||||||
|
created_at timestamp without time zone NOT NULL,
|
||||||
|
updated_at timestamp without time zone NOT NULL,
|
||||||
|
PRIMARY KEY (company_id, id) -- added
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE ads (
|
||||||
|
id bigserial, -- was: PRIMARY KEY
|
||||||
|
company_id bigint, -- added
|
||||||
|
campaign_id bigint, -- was: REFERENCES campaigns (id)
|
||||||
|
name text NOT NULL,
|
||||||
|
image_url text,
|
||||||
|
target_url text,
|
||||||
|
impressions_count bigint DEFAULT 0,
|
||||||
|
clicks_count bigint DEFAULT 0,
|
||||||
|
created_at timestamp without time zone NOT NULL,
|
||||||
|
updated_at timestamp without time zone NOT NULL,
|
||||||
|
PRIMARY KEY (company_id, id), -- added
|
||||||
|
FOREIGN KEY (company_id, campaign_id) -- added
|
||||||
|
REFERENCES campaigns (company_id, id)
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE clicks (
|
||||||
|
id bigserial, -- was: PRIMARY KEY
|
||||||
|
company_id bigint, -- added
|
||||||
|
ad_id bigint, -- was: REFERENCES ads (id),
|
||||||
|
clicked_at timestamp without time zone NOT NULL,
|
||||||
|
site_url text NOT NULL,
|
||||||
|
cost_per_click_usd numeric(20,10),
|
||||||
|
user_ip inet NOT NULL,
|
||||||
|
user_data jsonb NOT NULL,
|
||||||
|
PRIMARY KEY (company_id, id), -- added
|
||||||
|
FOREIGN KEY (company_id, ad_id) -- added
|
||||||
|
REFERENCES ads (company_id, id)
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE impressions (
|
||||||
|
id bigserial, -- was: PRIMARY KEY
|
||||||
|
company_id bigint, -- added
|
||||||
|
ad_id bigint, -- was: REFERENCES ads (id),
|
||||||
|
seen_at timestamp without time zone NOT NULL,
|
||||||
|
site_url text NOT NULL,
|
||||||
|
cost_per_impression_usd numeric(20,10),
|
||||||
|
user_ip inet NOT NULL,
|
||||||
|
user_data jsonb NOT NULL,
|
||||||
|
PRIMARY KEY (company_id, id), -- added
|
||||||
|
FOREIGN KEY (company_id, ad_id) -- added
|
||||||
|
REFERENCES ads (company_id, id)
|
||||||
|
);
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
|
CREATE OR REPLACE VIEW citus_shard_key_candidates AS
|
||||||
|
WITH fkeys AS (
|
||||||
|
select
|
||||||
|
c.oid as reloid, cf.oid as freloid,
|
||||||
|
(select string_agg(attname, ',')
|
||||||
|
from pg_attribute a
|
||||||
|
where a.attrelid = r.conrelid
|
||||||
|
and array[a.attnum::integer] <@ conkey::integer[]
|
||||||
|
) as conkey,
|
||||||
|
(select string_agg(attname, ',')
|
||||||
|
from pg_attribute a
|
||||||
|
where a.attrelid = r.confrelid
|
||||||
|
and array[a.attnum::integer] <@ confkey::integer[]
|
||||||
|
) as confkey
|
||||||
|
from pg_catalog.pg_constraint r
|
||||||
|
JOIN pg_class c on r.conrelid = c.oid
|
||||||
|
JOIN pg_namespace n on c.relnamespace = n.oid
|
||||||
|
JOIN pg_class cf on r.confrelid = cf.oid
|
||||||
|
JOIN pg_namespace nf on cf.relnamespace = nf.oid
|
||||||
|
JOIN pg_depend d on d.classid = 'pg_constraint'::regclass
|
||||||
|
and d.objid = r.oid
|
||||||
|
and d.refobjsubid = 0
|
||||||
|
where r.contype = 'f'
|
||||||
|
AND n.nspname !~ '^pg_' and n.nspname <> 'information_schema'
|
||||||
|
AND nf.nspname !~ '^pg_' and nf.nspname <> 'information_schema'
|
||||||
|
)
|
||||||
|
select n.nspname as schema_name, c.relname as table_name,
|
||||||
|
CASE WHEN pkeys.attname IS NULL THEN 'reference' ELSE 'distributed' END AS citus_table_type,
|
||||||
|
pkeys.attname as distribution_key,
|
||||||
|
CASE
|
||||||
|
WHEN COUNT(fkeys_d.reloid) > 0
|
||||||
|
THEN json_agg(json_object(array['table', 'column'], array[fkeys_d.reloid::regclass::text, fkeys_d.conkey]) order by fkeys_d.reloid::regclass::text)
|
||||||
|
WHEN COUNT(fkeys_r.freloid) > 0
|
||||||
|
THEN json_agg(json_object(array['table', 'column'], array[fkeys_r.freloid::regclass::text, fkeys_r.confkey]) order by fkeys_r.freloid::regclass::text)
|
||||||
|
END as related_columns
|
||||||
|
from pg_catalog.pg_class c
|
||||||
|
join pg_catalog.pg_namespace n on c.relnamespace = n.oid
|
||||||
|
join pg_roles auth ON auth.oid = c.relowner
|
||||||
|
left join lateral (
|
||||||
|
select indrelid, indexrelid, a.attname
|
||||||
|
from pg_index x
|
||||||
|
join pg_class i on i.oid = x.indexrelid
|
||||||
|
join pg_attribute a on a.attrelid = c.oid and attnum = indkey[0]
|
||||||
|
where x.indrelid = c.oid
|
||||||
|
and (indisprimary or indisunique)
|
||||||
|
and array_length(indkey::integer[], 1) = 1
|
||||||
|
and atttypid in ('smallint'::regtype,
|
||||||
|
'int'::regtype,
|
||||||
|
'bigint'::regtype)
|
||||||
|
order by not indisprimary, not indisunique
|
||||||
|
limit 1
|
||||||
|
) as pkeys on true
|
||||||
|
left join fkeys AS fkeys_d ON (fkeys_d.freloid = c.oid)
|
||||||
|
left join fkeys AS fkeys_r ON (fkeys_r.reloid = c.oid)
|
||||||
|
where relkind = 'r'
|
||||||
|
and n.nspname !~ '^pg_' and n.nspname <> 'information_schema'
|
||||||
|
and not exists
|
||||||
|
(
|
||||||
|
select 1
|
||||||
|
from pg_depend d
|
||||||
|
where d.classid = 'pg_class'::regclass
|
||||||
|
and d.objid = c.oid
|
||||||
|
and d.deptype = 'e'
|
||||||
|
)
|
||||||
|
and not exists (select 1 from pg_dist_partition where logicalrelid = c.oid)
|
||||||
|
group by schema_name,table_name,citus_table_type,distribution_key
|
||||||
|
order by citus_table_type, n.nspname, c.relname;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue