mirror of https://github.com/citusdata/citus.git
Add get_distribution_value_shardid UDF
With this UDF users can now map given distribution value to shard id. We mostly hide shardids from users to prevent unnecessary complexity but some power users might need to know about which entry/value is stored in which shard for maintanence purposes. Signature of this UDF is as follows; bigint get_distribution_value_shardid(table_name regclass, distribution_value anyelement)pull/1048/head
parent
ce3fec00e5
commit
b7972ed056
|
@ -9,7 +9,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
|||
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
|
||||
5.2-1 5.2-2 5.2-3 5.2-4 \
|
||||
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
|
||||
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6
|
||||
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7
|
||||
|
||||
# All citus--*.sql files in the source directory
|
||||
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
||||
|
@ -107,6 +107,8 @@ $(EXTENSION)--6.1-5.sql: $(EXTENSION)--6.1-4.sql $(EXTENSION)--6.1-4--6.1-5.sql
|
|||
cat $^ > $@
|
||||
$(EXTENSION)--6.1-6.sql: $(EXTENSION)--6.1-5.sql $(EXTENSION)--6.1-5--6.1-6.sql
|
||||
cat $^ > $@
|
||||
$(EXTENSION)--6.1-7.sql: $(EXTENSION)--6.1-6.sql $(EXTENSION)--6.1-6--6.1-7.sql
|
||||
cat $^ > $@
|
||||
|
||||
NO_PGXS = 1
|
||||
|
||||
|
|
|
@ -6,4 +6,4 @@ SET search_path = 'pg_catalog';
|
|||
-- wouldn't have partition columns, which we represent as NULL
|
||||
ALTER TABLE pg_dist_partition ALTER COLUMN partkey DROP NOT NULL;
|
||||
|
||||
RESET search_path;
|
||||
RESET search_path;
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
/* citus--6.1-6--6.1-7.sql */
|
||||
|
||||
SET search_path = 'pg_catalog';
|
||||
|
||||
CREATE FUNCTION get_shard_id_for_distribution_column(table_name regclass, distribution_value "any" DEFAULT NULL)
|
||||
RETURNS bigint
|
||||
LANGUAGE C
|
||||
AS 'MODULE_PATHNAME', $$get_shard_id_for_distribution_column$$;
|
||||
COMMENT ON FUNCTION get_shard_id_for_distribution_column(table_name regclass, distribution_value "any")
|
||||
IS 'return shard id which belongs to given table and contains given value';
|
||||
|
||||
RESET search_path;
|
|
@ -1,6 +1,6 @@
|
|||
# Citus extension
|
||||
comment = 'Citus distributed database'
|
||||
default_version = '6.1-6'
|
||||
default_version = '6.1-7'
|
||||
module_pathname = '$libdir/citus'
|
||||
relocatable = false
|
||||
schema = pg_catalog
|
||||
|
|
|
@ -27,7 +27,9 @@
|
|||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/multi_join_order.h"
|
||||
#include "distributed/pg_dist_node.h"
|
||||
#include "distributed/shardinterval_utils.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
#include "distributed/worker_transaction.h"
|
||||
#include "lib/stringinfo.h"
|
||||
|
@ -60,6 +62,7 @@ static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapT
|
|||
PG_FUNCTION_INFO_V1(master_add_node);
|
||||
PG_FUNCTION_INFO_V1(master_remove_node);
|
||||
PG_FUNCTION_INFO_V1(master_initialize_node_metadata);
|
||||
PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -144,6 +147,67 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* get_shard_id_for_distribution_column function takes a distributed table name and a
|
||||
* distribution value then returns shard id of the shard which belongs to given table and
|
||||
* contains given value. This function only works for hash distributed tables.
|
||||
*/
|
||||
Datum
|
||||
get_shard_id_for_distribution_column(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid relationId = PG_GETARG_OID(0);
|
||||
Datum distributionValue = PG_GETARG_DATUM(1);
|
||||
|
||||
Var *distributionColumn = NULL;
|
||||
char distributionMethod = 0;
|
||||
Oid expectedElementType = InvalidOid;
|
||||
Oid inputElementType = InvalidOid;
|
||||
DistTableCacheEntry *cacheEntry = NULL;
|
||||
int shardCount = 0;
|
||||
ShardInterval **shardIntervalArray = NULL;
|
||||
FmgrInfo *hashFunction = NULL;
|
||||
FmgrInfo *compareFunction = NULL;
|
||||
bool useBinarySearch = true;
|
||||
ShardInterval *shardInterval = NULL;
|
||||
|
||||
if (!IsDistributedTable(relationId))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
|
||||
errmsg("relation is not distributed")));
|
||||
}
|
||||
|
||||
distributionMethod = PartitionMethod(relationId);
|
||||
if (distributionMethod != DISTRIBUTE_BY_HASH)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("finding shard id of given distribution value is not "
|
||||
"supported for non-hash partitioned tables")));
|
||||
}
|
||||
|
||||
distributionColumn = PartitionKey(relationId);
|
||||
expectedElementType = distributionColumn->vartype;
|
||||
inputElementType = get_fn_expr_argtype(fcinfo->flinfo, 1);
|
||||
if (expectedElementType != inputElementType)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||
errmsg("invalid distribution value type"),
|
||||
errdetail("Type of the value does not match the type of the "
|
||||
"distribution column.")));
|
||||
}
|
||||
|
||||
cacheEntry = DistributedTableCacheEntry(relationId);
|
||||
shardCount = cacheEntry->shardIntervalArrayLength;
|
||||
shardIntervalArray = cacheEntry->sortedShardIntervalArray;
|
||||
hashFunction = cacheEntry->hashFunction;
|
||||
compareFunction = cacheEntry->shardIntervalCompareFunction;
|
||||
shardInterval = FindShardInterval(distributionValue, shardIntervalArray, shardCount,
|
||||
distributionMethod, compareFunction, hashFunction,
|
||||
useBinarySearch);
|
||||
|
||||
PG_RETURN_INT64(shardInterval->shardId);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* FindWorkerNode searches over the worker nodes and returns the workerNode
|
||||
* if it already exists. Else, the function returns NULL.
|
||||
|
|
|
@ -300,3 +300,115 @@ SELECT COUNT(*) FROM pg_locks WHERE locktype = 'advisory' AND objid = 5;
|
|||
0
|
||||
(1 row)
|
||||
|
||||
-- test get_shard_id_for_distribution_column
|
||||
SET citus.shard_count TO 4;
|
||||
CREATE TABLE get_shardid_test_table1(column1 int, column2 int);
|
||||
SELECT create_distributed_table('get_shardid_test_table1', 'column1');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\COPY get_shardid_test_table1 FROM STDIN with delimiter '|';
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 1);
|
||||
get_shard_id_for_distribution_column
|
||||
--------------------------------------
|
||||
540006
|
||||
(1 row)
|
||||
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 2);
|
||||
get_shard_id_for_distribution_column
|
||||
--------------------------------------
|
||||
540009
|
||||
(1 row)
|
||||
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 3);
|
||||
get_shard_id_for_distribution_column
|
||||
--------------------------------------
|
||||
540007
|
||||
(1 row)
|
||||
|
||||
-- verify result of the get_shard_id_for_distribution_column
|
||||
\c - - - :worker_1_port
|
||||
SELECT * FROM get_shardid_test_table1_540006;
|
||||
column1 | column2
|
||||
---------+---------
|
||||
1 | 1
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM get_shardid_test_table1_540009;
|
||||
column1 | column2
|
||||
---------+---------
|
||||
2 | 2
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM get_shardid_test_table1_540007;
|
||||
column1 | column2
|
||||
---------+---------
|
||||
3 | 3
|
||||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
-- test non-existing value
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 4);
|
||||
get_shard_id_for_distribution_column
|
||||
--------------------------------------
|
||||
540007
|
||||
(1 row)
|
||||
|
||||
-- test array type
|
||||
SET citus.shard_count TO 4;
|
||||
CREATE TABLE get_shardid_test_table2(column1 text[], column2 int);
|
||||
SELECT create_distributed_table('get_shardid_test_table2', 'column1');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\COPY get_shardid_test_table2 FROM STDIN with delimiter '|';
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{a, b, c}'::text[]);
|
||||
get_shard_id_for_distribution_column
|
||||
--------------------------------------
|
||||
540013
|
||||
(1 row)
|
||||
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{d, e, f}'::text[]);
|
||||
get_shard_id_for_distribution_column
|
||||
--------------------------------------
|
||||
540011
|
||||
(1 row)
|
||||
|
||||
-- verify result of the get_shard_id_for_distribution_column
|
||||
\c - - - :worker_1_port
|
||||
SELECT * FROM get_shardid_test_table2_540013;
|
||||
column1 | column2
|
||||
---------+---------
|
||||
{a,b,c} | 1
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM get_shardid_test_table2_540011;
|
||||
column1 | column2
|
||||
---------+---------
|
||||
{d,e,f} | 2
|
||||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
-- test mismatching data type
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a'::text);
|
||||
ERROR: invalid distribution value type
|
||||
DETAIL: Type of the value does not match the type of the distribution column.
|
||||
-- test non-distributed table
|
||||
CREATE TABLE get_shardid_test_table3(column1 int, column2 int);
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1);
|
||||
ERROR: relation is not distributed
|
||||
-- test non-hash distributed table
|
||||
SELECT create_distributed_table('get_shardid_test_table3', 'column1', 'append');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1);
|
||||
ERROR: finding shard id of given distribution value is not supported for non-hash partitioned tables
|
||||
-- clear unnecessary tables;
|
||||
DROP TABLE get_shardid_test_table1, get_shardid_test_table2, get_shardid_test_table3;
|
||||
|
|
|
@ -64,6 +64,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-3';
|
|||
ALTER EXTENSION citus UPDATE TO '6.1-4';
|
||||
ALTER EXTENSION citus UPDATE TO '6.1-5';
|
||||
ALTER EXTENSION citus UPDATE TO '6.1-6';
|
||||
ALTER EXTENSION citus UPDATE TO '6.1-7';
|
||||
-- ensure no objects were created outside pg_catalog
|
||||
SELECT COUNT(*)
|
||||
FROM pg_depend AS pgd,
|
||||
|
|
|
@ -202,3 +202,57 @@ COMMIT;
|
|||
|
||||
-- lock should be gone now
|
||||
SELECT COUNT(*) FROM pg_locks WHERE locktype = 'advisory' AND objid = 5;
|
||||
|
||||
-- test get_shard_id_for_distribution_column
|
||||
SET citus.shard_count TO 4;
|
||||
CREATE TABLE get_shardid_test_table1(column1 int, column2 int);
|
||||
SELECT create_distributed_table('get_shardid_test_table1', 'column1');
|
||||
\COPY get_shardid_test_table1 FROM STDIN with delimiter '|';
|
||||
1|1
|
||||
2|2
|
||||
3|3
|
||||
\.
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 1);
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 2);
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 3);
|
||||
|
||||
-- verify result of the get_shard_id_for_distribution_column
|
||||
\c - - - :worker_1_port
|
||||
SELECT * FROM get_shardid_test_table1_540006;
|
||||
SELECT * FROM get_shardid_test_table1_540009;
|
||||
SELECT * FROM get_shardid_test_table1_540007;
|
||||
\c - - - :master_port
|
||||
|
||||
-- test non-existing value
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 4);
|
||||
|
||||
-- test array type
|
||||
SET citus.shard_count TO 4;
|
||||
CREATE TABLE get_shardid_test_table2(column1 text[], column2 int);
|
||||
SELECT create_distributed_table('get_shardid_test_table2', 'column1');
|
||||
\COPY get_shardid_test_table2 FROM STDIN with delimiter '|';
|
||||
{a, b, c}|1
|
||||
{d, e, f}|2
|
||||
\.
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{a, b, c}'::text[]);
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{d, e, f}'::text[]);
|
||||
|
||||
-- verify result of the get_shard_id_for_distribution_column
|
||||
\c - - - :worker_1_port
|
||||
SELECT * FROM get_shardid_test_table2_540013;
|
||||
SELECT * FROM get_shardid_test_table2_540011;
|
||||
\c - - - :master_port
|
||||
|
||||
-- test mismatching data type
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a'::text);
|
||||
|
||||
-- test non-distributed table
|
||||
CREATE TABLE get_shardid_test_table3(column1 int, column2 int);
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1);
|
||||
|
||||
-- test non-hash distributed table
|
||||
SELECT create_distributed_table('get_shardid_test_table3', 'column1', 'append');
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1);
|
||||
|
||||
-- clear unnecessary tables;
|
||||
DROP TABLE get_shardid_test_table1, get_shardid_test_table2, get_shardid_test_table3;
|
||||
|
|
|
@ -64,6 +64,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-3';
|
|||
ALTER EXTENSION citus UPDATE TO '6.1-4';
|
||||
ALTER EXTENSION citus UPDATE TO '6.1-5';
|
||||
ALTER EXTENSION citus UPDATE TO '6.1-6';
|
||||
ALTER EXTENSION citus UPDATE TO '6.1-7';
|
||||
|
||||
-- ensure no objects were created outside pg_catalog
|
||||
SELECT COUNT(*)
|
||||
|
|
Loading…
Reference in New Issue