Add get_distribution_value_shardid UDF (#1048)

* Add get_distribution_value_shardid UDF

With this UDF users can now map given distribution value to shard id. We mostly hide
shardids from users to prevent unnecessary complexity but some power users might need
to know about which entry/value is stored in which shard for maintanence purposes.

Signature of this UDF is as follows;

bigint get_distribution_value_shardid(table_name regclass, distribution_value anyelement)
pull/1056/head
Burak Yücesoy 2016-12-22 12:17:08 +03:00 committed by GitHub
parent ce3fec00e5
commit 501a2ecead
9 changed files with 462 additions and 3 deletions

View File

@ -9,7 +9,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
5.2-1 5.2-2 5.2-3 5.2-4 \ 5.2-1 5.2-2 5.2-3 5.2-4 \
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7
# All citus--*.sql files in the source directory # All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -107,6 +107,8 @@ $(EXTENSION)--6.1-5.sql: $(EXTENSION)--6.1-4.sql $(EXTENSION)--6.1-4--6.1-5.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--6.1-6.sql: $(EXTENSION)--6.1-5.sql $(EXTENSION)--6.1-5--6.1-6.sql $(EXTENSION)--6.1-6.sql: $(EXTENSION)--6.1-5.sql $(EXTENSION)--6.1-5--6.1-6.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--6.1-7.sql: $(EXTENSION)--6.1-6.sql $(EXTENSION)--6.1-6--6.1-7.sql
cat $^ > $@
NO_PGXS = 1 NO_PGXS = 1

View File

@ -0,0 +1,12 @@
/* citus--6.1-6--6.1-7.sql */
SET search_path = 'pg_catalog';
CREATE FUNCTION get_shard_id_for_distribution_column(table_name regclass, distribution_value "any" DEFAULT NULL)
RETURNS bigint
LANGUAGE C
AS 'MODULE_PATHNAME', $$get_shard_id_for_distribution_column$$;
COMMENT ON FUNCTION get_shard_id_for_distribution_column(table_name regclass, distribution_value "any")
IS 'return shard id which belongs to given table and contains given value';
RESET search_path;

View File

@ -1,6 +1,6 @@
# Citus extension # Citus extension
comment = 'Citus distributed database' comment = 'Citus distributed database'
default_version = '6.1-6' default_version = '6.1-7'
module_pathname = '$libdir/citus' module_pathname = '$libdir/citus'
relocatable = false relocatable = false
schema = pg_catalog schema = pg_catalog

View File

@ -27,7 +27,9 @@
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h" #include "distributed/metadata_sync.h"
#include "distributed/multi_join_order.h"
#include "distributed/pg_dist_node.h" #include "distributed/pg_dist_node.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h" #include "distributed/worker_transaction.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
@ -60,6 +62,7 @@ static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapT
PG_FUNCTION_INFO_V1(master_add_node); PG_FUNCTION_INFO_V1(master_add_node);
PG_FUNCTION_INFO_V1(master_remove_node); PG_FUNCTION_INFO_V1(master_remove_node);
PG_FUNCTION_INFO_V1(master_initialize_node_metadata); PG_FUNCTION_INFO_V1(master_initialize_node_metadata);
PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column);
/* /*
@ -144,6 +147,118 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS)
} }
/*
* get_shard_id_for_distribution_column function takes a distributed table name and a
* distribution value then returns shard id of the shard which belongs to given table and
* contains given value. This function only works for hash distributed tables.
*/
Datum
get_shard_id_for_distribution_column(PG_FUNCTION_ARGS)
{
Oid relationId = InvalidOid;
Datum distributionValue = 0;
Var *distributionColumn = NULL;
char distributionMethod = 0;
Oid expectedElementType = InvalidOid;
Oid inputElementType = InvalidOid;
DistTableCacheEntry *cacheEntry = NULL;
int shardCount = 0;
ShardInterval **shardIntervalArray = NULL;
FmgrInfo *hashFunction = NULL;
FmgrInfo *compareFunction = NULL;
bool useBinarySearch = true;
ShardInterval *shardInterval = NULL;
/*
* To have optional parameter as NULL, we defined this UDF as not strict, therefore
* we need to check all parameters for NULL values.
*/
if (PG_ARGISNULL(0))
{
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("relation cannot be NULL")));
}
relationId = PG_GETARG_OID(0);
EnsureTablePermissions(relationId, ACL_SELECT);
if (!IsDistributedTable(relationId))
{
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("relation is not distributed")));
}
distributionMethod = PartitionMethod(relationId);
if (distributionMethod == DISTRIBUTE_BY_NONE)
{
List *shardIntervalList = LoadShardIntervalList(relationId);
if (shardIntervalList == NIL)
{
PG_RETURN_INT64(NULL);
}
shardInterval = (ShardInterval *) linitial(shardIntervalList);
}
else if (distributionMethod == DISTRIBUTE_BY_HASH ||
distributionMethod == DISTRIBUTE_BY_RANGE)
{
/* if given table is not reference table, distributionValue cannot be NULL */
if (PG_ARGISNULL(1))
{
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("distribution value cannot be NULL for tables other "
"than reference tables.")));
}
distributionValue = PG_GETARG_DATUM(1);
distributionColumn = PartitionKey(relationId);
expectedElementType = distributionColumn->vartype;
inputElementType = get_fn_expr_argtype(fcinfo->flinfo, 1);
if (expectedElementType != inputElementType)
{
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("invalid distribution value type"),
errdetail("Type of the value does not match the type of the "
"distribution column. Expected type id: %d, given "
"type id: %d", expectedElementType,
inputElementType)));
}
cacheEntry = DistributedTableCacheEntry(relationId);
if (distributionMethod == DISTRIBUTE_BY_HASH &&
cacheEntry->hasUniformHashDistribution)
{
useBinarySearch = false;
}
shardCount = cacheEntry->shardIntervalArrayLength;
shardIntervalArray = cacheEntry->sortedShardIntervalArray;
hashFunction = cacheEntry->hashFunction;
compareFunction = cacheEntry->shardIntervalCompareFunction;
shardInterval = FindShardInterval(distributionValue, shardIntervalArray,
shardCount, distributionMethod, compareFunction,
hashFunction, useBinarySearch);
}
else
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("finding shard id of given distribution value is only "
"supported for hash partitioned tables, range partitioned "
"tables and reference tables.")));
}
if (shardInterval != NULL)
{
PG_RETURN_INT64(shardInterval->shardId);
}
PG_RETURN_INT64(NULL);
}
/* /*
* FindWorkerNode searches over the worker nodes and returns the workerNode * FindWorkerNode searches over the worker nodes and returns the workerNode
* if it already exists. Else, the function returns NULL. * if it already exists. Else, the function returns NULL.

View File

@ -300,3 +300,235 @@ SELECT COUNT(*) FROM pg_locks WHERE locktype = 'advisory' AND objid = 5;
0 0
(1 row) (1 row)
-- test get_shard_id_for_distribution_column
SET citus.shard_count TO 4;
CREATE TABLE get_shardid_test_table1(column1 int, column2 int);
SELECT create_distributed_table('get_shardid_test_table1', 'column1');
create_distributed_table
--------------------------
(1 row)
\COPY get_shardid_test_table1 FROM STDIN with delimiter '|';
SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 1);
get_shard_id_for_distribution_column
--------------------------------------
540006
(1 row)
SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 2);
get_shard_id_for_distribution_column
--------------------------------------
540009
(1 row)
SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 3);
get_shard_id_for_distribution_column
--------------------------------------
540007
(1 row)
-- verify result of the get_shard_id_for_distribution_column
\c - - - :worker_1_port
SELECT * FROM get_shardid_test_table1_540006;
column1 | column2
---------+---------
1 | 1
(1 row)
SELECT * FROM get_shardid_test_table1_540009;
column1 | column2
---------+---------
2 | 2
(1 row)
SELECT * FROM get_shardid_test_table1_540007;
column1 | column2
---------+---------
3 | 3
(1 row)
\c - - - :master_port
-- test non-existing value
SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 4);
get_shard_id_for_distribution_column
--------------------------------------
540007
(1 row)
-- test array type
SET citus.shard_count TO 4;
CREATE TABLE get_shardid_test_table2(column1 text[], column2 int);
SELECT create_distributed_table('get_shardid_test_table2', 'column1');
create_distributed_table
--------------------------
(1 row)
\COPY get_shardid_test_table2 FROM STDIN with delimiter '|';
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{a, b, c}'::text[]);
get_shard_id_for_distribution_column
--------------------------------------
540013
(1 row)
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{d, e, f}'::text[]);
get_shard_id_for_distribution_column
--------------------------------------
540011
(1 row)
-- verify result of the get_shard_id_for_distribution_column
\c - - - :worker_1_port
SELECT * FROM get_shardid_test_table2_540013;
column1 | column2
---------+---------
{a,b,c} | 1
(1 row)
SELECT * FROM get_shardid_test_table2_540011;
column1 | column2
---------+---------
{d,e,f} | 2
(1 row)
\c - - - :master_port
-- test mismatching data type
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a'::text);
ERROR: invalid distribution value type
DETAIL: Type of the value does not match the type of the distribution column. Expected type id: 1009, given type id: 25
-- test NULL distribution column value for hash distributed table
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2');
ERROR: distribution value cannot be NULL for tables other than reference tables.
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', NULL);
ERROR: distribution value cannot be NULL for tables other than reference tables.
-- test non-distributed table
CREATE TABLE get_shardid_test_table3(column1 int, column2 int);
SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1);
ERROR: relation is not distributed
-- test append distributed table
SELECT create_distributed_table('get_shardid_test_table3', 'column1', 'append');
create_distributed_table
--------------------------
(1 row)
SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1);
ERROR: finding shard id of given distribution value is only supported for hash partitioned tables, range partitioned tables and reference tables.
-- test reference table;
CREATE TABLE get_shardid_test_table4(column1 int, column2 int);
SELECT create_reference_table('get_shardid_test_table4');
create_reference_table
------------------------
(1 row)
-- test NULL distribution column value for reference table
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4');
get_shard_id_for_distribution_column
--------------------------------------
540014
(1 row)
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', NULL);
get_shard_id_for_distribution_column
--------------------------------------
540014
(1 row)
-- test different data types for reference table
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 1);
get_shard_id_for_distribution_column
--------------------------------------
540014
(1 row)
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 'a'::text);
get_shard_id_for_distribution_column
--------------------------------------
540014
(1 row)
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', '{a, b, c}'::text[]);
get_shard_id_for_distribution_column
--------------------------------------
540014
(1 row)
-- test range distributed table
CREATE TABLE get_shardid_test_table5(column1 int, column2 int);
SELECT create_distributed_table('get_shardid_test_table5', 'column1', 'range');
create_distributed_table
--------------------------
(1 row)
-- create worker shards
SELECT master_create_empty_shard('get_shardid_test_table5');
master_create_empty_shard
---------------------------
540015
(1 row)
SELECT master_create_empty_shard('get_shardid_test_table5');
master_create_empty_shard
---------------------------
540016
(1 row)
SELECT master_create_empty_shard('get_shardid_test_table5');
master_create_empty_shard
---------------------------
540017
(1 row)
SELECT master_create_empty_shard('get_shardid_test_table5');
master_create_empty_shard
---------------------------
540018
(1 row)
-- now the comparison is done via the partition column type, which is text
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 1000 WHERE shardid = 540015;
UPDATE pg_dist_shard SET shardminvalue = 1001, shardmaxvalue = 2000 WHERE shardid = 540016;
UPDATE pg_dist_shard SET shardminvalue = 2001, shardmaxvalue = 3000 WHERE shardid = 540017;
UPDATE pg_dist_shard SET shardminvalue = 3001, shardmaxvalue = 4000 WHERE shardid = 540018;
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 5);
get_shard_id_for_distribution_column
--------------------------------------
540015
(1 row)
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 1111);
get_shard_id_for_distribution_column
--------------------------------------
540016
(1 row)
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 2689);
get_shard_id_for_distribution_column
--------------------------------------
540017
(1 row)
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 3248);
get_shard_id_for_distribution_column
--------------------------------------
540018
(1 row)
-- test non-existing value for range distributed tables
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 4001);
get_shard_id_for_distribution_column
--------------------------------------
0
(1 row)
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', -999);
get_shard_id_for_distribution_column
--------------------------------------
0
(1 row)
-- clear unnecessary tables;
DROP TABLE get_shardid_test_table1, get_shardid_test_table2, get_shardid_test_table3, get_shardid_test_table4, get_shardid_test_table5;

View File

@ -64,6 +64,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-3';
ALTER EXTENSION citus UPDATE TO '6.1-4'; ALTER EXTENSION citus UPDATE TO '6.1-4';
ALTER EXTENSION citus UPDATE TO '6.1-5'; ALTER EXTENSION citus UPDATE TO '6.1-5';
ALTER EXTENSION citus UPDATE TO '6.1-6'; ALTER EXTENSION citus UPDATE TO '6.1-6';
ALTER EXTENSION citus UPDATE TO '6.1-7';
-- ensure no objects were created outside pg_catalog -- ensure no objects were created outside pg_catalog
SELECT COUNT(*) SELECT COUNT(*)
FROM pg_depend AS pgd, FROM pg_depend AS pgd,

View File

@ -202,3 +202,99 @@ COMMIT;
-- lock should be gone now -- lock should be gone now
SELECT COUNT(*) FROM pg_locks WHERE locktype = 'advisory' AND objid = 5; SELECT COUNT(*) FROM pg_locks WHERE locktype = 'advisory' AND objid = 5;
-- test get_shard_id_for_distribution_column
SET citus.shard_count TO 4;
CREATE TABLE get_shardid_test_table1(column1 int, column2 int);
SELECT create_distributed_table('get_shardid_test_table1', 'column1');
\COPY get_shardid_test_table1 FROM STDIN with delimiter '|';
1|1
2|2
3|3
\.
SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 1);
SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 2);
SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 3);
-- verify result of the get_shard_id_for_distribution_column
\c - - - :worker_1_port
SELECT * FROM get_shardid_test_table1_540006;
SELECT * FROM get_shardid_test_table1_540009;
SELECT * FROM get_shardid_test_table1_540007;
\c - - - :master_port
-- test non-existing value
SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 4);
-- test array type
SET citus.shard_count TO 4;
CREATE TABLE get_shardid_test_table2(column1 text[], column2 int);
SELECT create_distributed_table('get_shardid_test_table2', 'column1');
\COPY get_shardid_test_table2 FROM STDIN with delimiter '|';
{a, b, c}|1
{d, e, f}|2
\.
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{a, b, c}'::text[]);
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{d, e, f}'::text[]);
-- verify result of the get_shard_id_for_distribution_column
\c - - - :worker_1_port
SELECT * FROM get_shardid_test_table2_540013;
SELECT * FROM get_shardid_test_table2_540011;
\c - - - :master_port
-- test mismatching data type
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a'::text);
-- test NULL distribution column value for hash distributed table
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2');
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', NULL);
-- test non-distributed table
CREATE TABLE get_shardid_test_table3(column1 int, column2 int);
SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1);
-- test append distributed table
SELECT create_distributed_table('get_shardid_test_table3', 'column1', 'append');
SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1);
-- test reference table;
CREATE TABLE get_shardid_test_table4(column1 int, column2 int);
SELECT create_reference_table('get_shardid_test_table4');
-- test NULL distribution column value for reference table
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4');
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', NULL);
-- test different data types for reference table
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 1);
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 'a'::text);
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', '{a, b, c}'::text[]);
-- test range distributed table
CREATE TABLE get_shardid_test_table5(column1 int, column2 int);
SELECT create_distributed_table('get_shardid_test_table5', 'column1', 'range');
-- create worker shards
SELECT master_create_empty_shard('get_shardid_test_table5');
SELECT master_create_empty_shard('get_shardid_test_table5');
SELECT master_create_empty_shard('get_shardid_test_table5');
SELECT master_create_empty_shard('get_shardid_test_table5');
-- now the comparison is done via the partition column type, which is text
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 1000 WHERE shardid = 540015;
UPDATE pg_dist_shard SET shardminvalue = 1001, shardmaxvalue = 2000 WHERE shardid = 540016;
UPDATE pg_dist_shard SET shardminvalue = 2001, shardmaxvalue = 3000 WHERE shardid = 540017;
UPDATE pg_dist_shard SET shardminvalue = 3001, shardmaxvalue = 4000 WHERE shardid = 540018;
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 5);
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 1111);
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 2689);
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 3248);
-- test non-existing value for range distributed tables
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 4001);
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', -999);
-- clear unnecessary tables;
DROP TABLE get_shardid_test_table1, get_shardid_test_table2, get_shardid_test_table3, get_shardid_test_table4, get_shardid_test_table5;

View File

@ -64,6 +64,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-3';
ALTER EXTENSION citus UPDATE TO '6.1-4'; ALTER EXTENSION citus UPDATE TO '6.1-4';
ALTER EXTENSION citus UPDATE TO '6.1-5'; ALTER EXTENSION citus UPDATE TO '6.1-5';
ALTER EXTENSION citus UPDATE TO '6.1-6'; ALTER EXTENSION citus UPDATE TO '6.1-6';
ALTER EXTENSION citus UPDATE TO '6.1-7';
-- ensure no objects were created outside pg_catalog -- ensure no objects were created outside pg_catalog
SELECT COUNT(*) SELECT COUNT(*)