From b7972ed056cb94ce8e81458d0d54c3239e4a1ed2 Mon Sep 17 00:00:00 2001 From: Burak Yucesoy Date: Fri, 16 Dec 2016 06:34:41 +0300 Subject: [PATCH] Add get_distribution_value_shardid UDF With this UDF users can now map given distribution value to shard id. We mostly hide shardids from users to prevent unnecessary complexity but some power users might need to know about which entry/value is stored in which shard for maintanence purposes. Signature of this UDF is as follows; bigint get_distribution_value_shardid(table_name regclass, distribution_value anyelement) --- src/backend/distributed/Makefile | 4 +- .../distributed/citus--6.1-5--6.1-6.sql | 2 +- .../distributed/citus--6.1-6--6.1-7.sql | 12 ++ src/backend/distributed/citus.control | 2 +- src/backend/distributed/utils/node_metadata.c | 64 ++++++++++ .../expected/multi_distribution_metadata.out | 112 ++++++++++++++++++ src/test/regress/expected/multi_extension.out | 1 + .../sql/multi_distribution_metadata.sql | 54 +++++++++ src/test/regress/sql/multi_extension.sql | 1 + 9 files changed, 249 insertions(+), 3 deletions(-) create mode 100644 src/backend/distributed/citus--6.1-6--6.1-7.sql diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 6a99a066b..7b4746bb0 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -9,7 +9,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ - 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 + 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -107,6 +107,8 @@ $(EXTENSION)--6.1-5.sql: $(EXTENSION)--6.1-4.sql $(EXTENSION)--6.1-4--6.1-5.sql cat $^ > $@ $(EXTENSION)--6.1-6.sql: $(EXTENSION)--6.1-5.sql $(EXTENSION)--6.1-5--6.1-6.sql cat $^ > $@ +$(EXTENSION)--6.1-7.sql: $(EXTENSION)--6.1-6.sql $(EXTENSION)--6.1-6--6.1-7.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.1-5--6.1-6.sql b/src/backend/distributed/citus--6.1-5--6.1-6.sql index a186212da..daa01f2dd 100644 --- a/src/backend/distributed/citus--6.1-5--6.1-6.sql +++ b/src/backend/distributed/citus--6.1-5--6.1-6.sql @@ -6,4 +6,4 @@ SET search_path = 'pg_catalog'; -- wouldn't have partition columns, which we represent as NULL ALTER TABLE pg_dist_partition ALTER COLUMN partkey DROP NOT NULL; -RESET search_path; \ No newline at end of file +RESET search_path; diff --git a/src/backend/distributed/citus--6.1-6--6.1-7.sql b/src/backend/distributed/citus--6.1-6--6.1-7.sql new file mode 100644 index 000000000..28c00acc9 --- /dev/null +++ b/src/backend/distributed/citus--6.1-6--6.1-7.sql @@ -0,0 +1,12 @@ +/* citus--6.1-6--6.1-7.sql */ + +SET search_path = 'pg_catalog'; + +CREATE FUNCTION get_shard_id_for_distribution_column(table_name regclass, distribution_value "any" DEFAULT NULL) + RETURNS bigint + LANGUAGE C + AS 'MODULE_PATHNAME', $$get_shard_id_for_distribution_column$$; +COMMENT ON FUNCTION get_shard_id_for_distribution_column(table_name regclass, distribution_value "any") + IS 'return shard id which belongs to given table and contains given value'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 8f7d391de..ca50cbb41 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.1-6' +default_version = '6.1-7' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index f6379ca58..dc089782b 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -27,7 +27,9 @@ #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" +#include "distributed/multi_join_order.h" #include "distributed/pg_dist_node.h" +#include "distributed/shardinterval_utils.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" #include "lib/stringinfo.h" @@ -60,6 +62,7 @@ static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapT PG_FUNCTION_INFO_V1(master_add_node); PG_FUNCTION_INFO_V1(master_remove_node); PG_FUNCTION_INFO_V1(master_initialize_node_metadata); +PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column); /* @@ -144,6 +147,67 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS) } +/* + * get_shard_id_for_distribution_column function takes a distributed table name and a + * distribution value then returns shard id of the shard which belongs to given table and + * contains given value. This function only works for hash distributed tables. + */ +Datum +get_shard_id_for_distribution_column(PG_FUNCTION_ARGS) +{ + Oid relationId = PG_GETARG_OID(0); + Datum distributionValue = PG_GETARG_DATUM(1); + + Var *distributionColumn = NULL; + char distributionMethod = 0; + Oid expectedElementType = InvalidOid; + Oid inputElementType = InvalidOid; + DistTableCacheEntry *cacheEntry = NULL; + int shardCount = 0; + ShardInterval **shardIntervalArray = NULL; + FmgrInfo *hashFunction = NULL; + FmgrInfo *compareFunction = NULL; + bool useBinarySearch = true; + ShardInterval *shardInterval = NULL; + + if (!IsDistributedTable(relationId)) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("relation is not distributed"))); + } + + distributionMethod = PartitionMethod(relationId); + if (distributionMethod != DISTRIBUTE_BY_HASH) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("finding shard id of given distribution value is not " + "supported for non-hash partitioned tables"))); + } + + distributionColumn = PartitionKey(relationId); + expectedElementType = distributionColumn->vartype; + inputElementType = get_fn_expr_argtype(fcinfo->flinfo, 1); + if (expectedElementType != inputElementType) + { + ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("invalid distribution value type"), + errdetail("Type of the value does not match the type of the " + "distribution column."))); + } + + cacheEntry = DistributedTableCacheEntry(relationId); + shardCount = cacheEntry->shardIntervalArrayLength; + shardIntervalArray = cacheEntry->sortedShardIntervalArray; + hashFunction = cacheEntry->hashFunction; + compareFunction = cacheEntry->shardIntervalCompareFunction; + shardInterval = FindShardInterval(distributionValue, shardIntervalArray, shardCount, + distributionMethod, compareFunction, hashFunction, + useBinarySearch); + + PG_RETURN_INT64(shardInterval->shardId); +} + + /* * FindWorkerNode searches over the worker nodes and returns the workerNode * if it already exists. Else, the function returns NULL. diff --git a/src/test/regress/expected/multi_distribution_metadata.out b/src/test/regress/expected/multi_distribution_metadata.out index 22740e09f..d92b3d770 100644 --- a/src/test/regress/expected/multi_distribution_metadata.out +++ b/src/test/regress/expected/multi_distribution_metadata.out @@ -300,3 +300,115 @@ SELECT COUNT(*) FROM pg_locks WHERE locktype = 'advisory' AND objid = 5; 0 (1 row) +-- test get_shard_id_for_distribution_column +SET citus.shard_count TO 4; +CREATE TABLE get_shardid_test_table1(column1 int, column2 int); +SELECT create_distributed_table('get_shardid_test_table1', 'column1'); + create_distributed_table +-------------------------- + +(1 row) + +\COPY get_shardid_test_table1 FROM STDIN with delimiter '|'; +SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 1); + get_shard_id_for_distribution_column +-------------------------------------- + 540006 +(1 row) + +SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 2); + get_shard_id_for_distribution_column +-------------------------------------- + 540009 +(1 row) + +SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 3); + get_shard_id_for_distribution_column +-------------------------------------- + 540007 +(1 row) + +-- verify result of the get_shard_id_for_distribution_column +\c - - - :worker_1_port +SELECT * FROM get_shardid_test_table1_540006; + column1 | column2 +---------+--------- + 1 | 1 +(1 row) + +SELECT * FROM get_shardid_test_table1_540009; + column1 | column2 +---------+--------- + 2 | 2 +(1 row) + +SELECT * FROM get_shardid_test_table1_540007; + column1 | column2 +---------+--------- + 3 | 3 +(1 row) + +\c - - - :master_port +-- test non-existing value +SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 4); + get_shard_id_for_distribution_column +-------------------------------------- + 540007 +(1 row) + +-- test array type +SET citus.shard_count TO 4; +CREATE TABLE get_shardid_test_table2(column1 text[], column2 int); +SELECT create_distributed_table('get_shardid_test_table2', 'column1'); + create_distributed_table +-------------------------- + +(1 row) + +\COPY get_shardid_test_table2 FROM STDIN with delimiter '|'; +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{a, b, c}'::text[]); + get_shard_id_for_distribution_column +-------------------------------------- + 540013 +(1 row) + +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{d, e, f}'::text[]); + get_shard_id_for_distribution_column +-------------------------------------- + 540011 +(1 row) + +-- verify result of the get_shard_id_for_distribution_column +\c - - - :worker_1_port +SELECT * FROM get_shardid_test_table2_540013; + column1 | column2 +---------+--------- + {a,b,c} | 1 +(1 row) + +SELECT * FROM get_shardid_test_table2_540011; + column1 | column2 +---------+--------- + {d,e,f} | 2 +(1 row) + +\c - - - :master_port +-- test mismatching data type +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a'::text); +ERROR: invalid distribution value type +DETAIL: Type of the value does not match the type of the distribution column. +-- test non-distributed table +CREATE TABLE get_shardid_test_table3(column1 int, column2 int); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1); +ERROR: relation is not distributed +-- test non-hash distributed table +SELECT create_distributed_table('get_shardid_test_table3', 'column1', 'append'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1); +ERROR: finding shard id of given distribution value is not supported for non-hash partitioned tables +-- clear unnecessary tables; +DROP TABLE get_shardid_test_table1, get_shardid_test_table2, get_shardid_test_table3; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index e7c814ef0..54a8777ce 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -64,6 +64,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-3'; ALTER EXTENSION citus UPDATE TO '6.1-4'; ALTER EXTENSION citus UPDATE TO '6.1-5'; ALTER EXTENSION citus UPDATE TO '6.1-6'; +ALTER EXTENSION citus UPDATE TO '6.1-7'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) FROM pg_depend AS pgd, diff --git a/src/test/regress/sql/multi_distribution_metadata.sql b/src/test/regress/sql/multi_distribution_metadata.sql index 41ad310e7..5d51dcca6 100644 --- a/src/test/regress/sql/multi_distribution_metadata.sql +++ b/src/test/regress/sql/multi_distribution_metadata.sql @@ -202,3 +202,57 @@ COMMIT; -- lock should be gone now SELECT COUNT(*) FROM pg_locks WHERE locktype = 'advisory' AND objid = 5; + +-- test get_shard_id_for_distribution_column +SET citus.shard_count TO 4; +CREATE TABLE get_shardid_test_table1(column1 int, column2 int); +SELECT create_distributed_table('get_shardid_test_table1', 'column1'); +\COPY get_shardid_test_table1 FROM STDIN with delimiter '|'; +1|1 +2|2 +3|3 +\. +SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 1); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 2); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 3); + +-- verify result of the get_shard_id_for_distribution_column +\c - - - :worker_1_port +SELECT * FROM get_shardid_test_table1_540006; +SELECT * FROM get_shardid_test_table1_540009; +SELECT * FROM get_shardid_test_table1_540007; +\c - - - :master_port + +-- test non-existing value +SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 4); + +-- test array type +SET citus.shard_count TO 4; +CREATE TABLE get_shardid_test_table2(column1 text[], column2 int); +SELECT create_distributed_table('get_shardid_test_table2', 'column1'); +\COPY get_shardid_test_table2 FROM STDIN with delimiter '|'; +{a, b, c}|1 +{d, e, f}|2 +\. +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{a, b, c}'::text[]); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{d, e, f}'::text[]); + +-- verify result of the get_shard_id_for_distribution_column +\c - - - :worker_1_port +SELECT * FROM get_shardid_test_table2_540013; +SELECT * FROM get_shardid_test_table2_540011; +\c - - - :master_port + +-- test mismatching data type +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a'::text); + +-- test non-distributed table +CREATE TABLE get_shardid_test_table3(column1 int, column2 int); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1); + +-- test non-hash distributed table +SELECT create_distributed_table('get_shardid_test_table3', 'column1', 'append'); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1); + +-- clear unnecessary tables; +DROP TABLE get_shardid_test_table1, get_shardid_test_table2, get_shardid_test_table3; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 46c167837..8dc7a8717 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -64,6 +64,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-3'; ALTER EXTENSION citus UPDATE TO '6.1-4'; ALTER EXTENSION citus UPDATE TO '6.1-5'; ALTER EXTENSION citus UPDATE TO '6.1-6'; +ALTER EXTENSION citus UPDATE TO '6.1-7'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*)