diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 6a99a066b..7b4746bb0 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -9,7 +9,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ - 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 + 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -107,6 +107,8 @@ $(EXTENSION)--6.1-5.sql: $(EXTENSION)--6.1-4.sql $(EXTENSION)--6.1-4--6.1-5.sql cat $^ > $@ $(EXTENSION)--6.1-6.sql: $(EXTENSION)--6.1-5.sql $(EXTENSION)--6.1-5--6.1-6.sql cat $^ > $@ +$(EXTENSION)--6.1-7.sql: $(EXTENSION)--6.1-6.sql $(EXTENSION)--6.1-6--6.1-7.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.1-5--6.1-6.sql b/src/backend/distributed/citus--6.1-5--6.1-6.sql index a186212da..daa01f2dd 100644 --- a/src/backend/distributed/citus--6.1-5--6.1-6.sql +++ b/src/backend/distributed/citus--6.1-5--6.1-6.sql @@ -6,4 +6,4 @@ SET search_path = 'pg_catalog'; -- wouldn't have partition columns, which we represent as NULL ALTER TABLE pg_dist_partition ALTER COLUMN partkey DROP NOT NULL; -RESET search_path; \ No newline at end of file +RESET search_path; diff --git a/src/backend/distributed/citus--6.1-6--6.1-7.sql b/src/backend/distributed/citus--6.1-6--6.1-7.sql new file mode 100644 index 000000000..28c00acc9 --- /dev/null +++ b/src/backend/distributed/citus--6.1-6--6.1-7.sql @@ -0,0 +1,12 @@ +/* citus--6.1-6--6.1-7.sql */ + +SET search_path = 'pg_catalog'; + +CREATE FUNCTION get_shard_id_for_distribution_column(table_name regclass, distribution_value "any" DEFAULT NULL) + RETURNS bigint + LANGUAGE C + AS 'MODULE_PATHNAME', $$get_shard_id_for_distribution_column$$; +COMMENT ON FUNCTION get_shard_id_for_distribution_column(table_name regclass, distribution_value "any") + IS 'return shard id which belongs to given table and contains given value'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 8f7d391de..ca50cbb41 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.1-6' +default_version = '6.1-7' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index f6379ca58..c8c5eaf07 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -27,7 +27,9 @@ #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" +#include "distributed/multi_join_order.h" #include "distributed/pg_dist_node.h" +#include "distributed/shardinterval_utils.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" #include "lib/stringinfo.h" @@ -60,6 +62,7 @@ static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapT PG_FUNCTION_INFO_V1(master_add_node); PG_FUNCTION_INFO_V1(master_remove_node); PG_FUNCTION_INFO_V1(master_initialize_node_metadata); +PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column); /* @@ -144,6 +147,118 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS) } +/* + * get_shard_id_for_distribution_column function takes a distributed table name and a + * distribution value then returns shard id of the shard which belongs to given table and + * contains given value. This function only works for hash distributed tables. + */ +Datum +get_shard_id_for_distribution_column(PG_FUNCTION_ARGS) +{ + Oid relationId = InvalidOid; + Datum distributionValue = 0; + + Var *distributionColumn = NULL; + char distributionMethod = 0; + Oid expectedElementType = InvalidOid; + Oid inputElementType = InvalidOid; + DistTableCacheEntry *cacheEntry = NULL; + int shardCount = 0; + ShardInterval **shardIntervalArray = NULL; + FmgrInfo *hashFunction = NULL; + FmgrInfo *compareFunction = NULL; + bool useBinarySearch = true; + ShardInterval *shardInterval = NULL; + + /* + * To have optional parameter as NULL, we defined this UDF as not strict, therefore + * we need to check all parameters for NULL values. + */ + if (PG_ARGISNULL(0)) + { + ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("relation cannot be NULL"))); + } + + relationId = PG_GETARG_OID(0); + EnsureTablePermissions(relationId, ACL_SELECT); + + if (!IsDistributedTable(relationId)) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("relation is not distributed"))); + } + + distributionMethod = PartitionMethod(relationId); + if (distributionMethod == DISTRIBUTE_BY_NONE) + { + List *shardIntervalList = LoadShardIntervalList(relationId); + if (shardIntervalList == NIL) + { + PG_RETURN_INT64(NULL); + } + + shardInterval = (ShardInterval *) linitial(shardIntervalList); + } + else if (distributionMethod == DISTRIBUTE_BY_HASH || + distributionMethod == DISTRIBUTE_BY_RANGE) + { + /* if given table is not reference table, distributionValue cannot be NULL */ + if (PG_ARGISNULL(1)) + { + ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("distribution value cannot be NULL for tables other " + "than reference tables."))); + } + + distributionValue = PG_GETARG_DATUM(1); + + distributionColumn = PartitionKey(relationId); + expectedElementType = distributionColumn->vartype; + inputElementType = get_fn_expr_argtype(fcinfo->flinfo, 1); + if (expectedElementType != inputElementType) + { + ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("invalid distribution value type"), + errdetail("Type of the value does not match the type of the " + "distribution column. Expected type id: %d, given " + "type id: %d", expectedElementType, + inputElementType))); + } + + cacheEntry = DistributedTableCacheEntry(relationId); + + if (distributionMethod == DISTRIBUTE_BY_HASH && + cacheEntry->hasUniformHashDistribution) + { + useBinarySearch = false; + } + + shardCount = cacheEntry->shardIntervalArrayLength; + shardIntervalArray = cacheEntry->sortedShardIntervalArray; + hashFunction = cacheEntry->hashFunction; + compareFunction = cacheEntry->shardIntervalCompareFunction; + shardInterval = FindShardInterval(distributionValue, shardIntervalArray, + shardCount, distributionMethod, compareFunction, + hashFunction, useBinarySearch); + } + else + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("finding shard id of given distribution value is only " + "supported for hash partitioned tables, range partitioned " + "tables and reference tables."))); + } + + if (shardInterval != NULL) + { + PG_RETURN_INT64(shardInterval->shardId); + } + + PG_RETURN_INT64(NULL); +} + + /* * FindWorkerNode searches over the worker nodes and returns the workerNode * if it already exists. Else, the function returns NULL. diff --git a/src/test/regress/expected/multi_distribution_metadata.out b/src/test/regress/expected/multi_distribution_metadata.out index 22740e09f..aebb04d65 100644 --- a/src/test/regress/expected/multi_distribution_metadata.out +++ b/src/test/regress/expected/multi_distribution_metadata.out @@ -300,3 +300,235 @@ SELECT COUNT(*) FROM pg_locks WHERE locktype = 'advisory' AND objid = 5; 0 (1 row) +-- test get_shard_id_for_distribution_column +SET citus.shard_count TO 4; +CREATE TABLE get_shardid_test_table1(column1 int, column2 int); +SELECT create_distributed_table('get_shardid_test_table1', 'column1'); + create_distributed_table +-------------------------- + +(1 row) + +\COPY get_shardid_test_table1 FROM STDIN with delimiter '|'; +SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 1); + get_shard_id_for_distribution_column +-------------------------------------- + 540006 +(1 row) + +SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 2); + get_shard_id_for_distribution_column +-------------------------------------- + 540009 +(1 row) + +SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 3); + get_shard_id_for_distribution_column +-------------------------------------- + 540007 +(1 row) + +-- verify result of the get_shard_id_for_distribution_column +\c - - - :worker_1_port +SELECT * FROM get_shardid_test_table1_540006; + column1 | column2 +---------+--------- + 1 | 1 +(1 row) + +SELECT * FROM get_shardid_test_table1_540009; + column1 | column2 +---------+--------- + 2 | 2 +(1 row) + +SELECT * FROM get_shardid_test_table1_540007; + column1 | column2 +---------+--------- + 3 | 3 +(1 row) + +\c - - - :master_port +-- test non-existing value +SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 4); + get_shard_id_for_distribution_column +-------------------------------------- + 540007 +(1 row) + +-- test array type +SET citus.shard_count TO 4; +CREATE TABLE get_shardid_test_table2(column1 text[], column2 int); +SELECT create_distributed_table('get_shardid_test_table2', 'column1'); + create_distributed_table +-------------------------- + +(1 row) + +\COPY get_shardid_test_table2 FROM STDIN with delimiter '|'; +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{a, b, c}'::text[]); + get_shard_id_for_distribution_column +-------------------------------------- + 540013 +(1 row) + +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{d, e, f}'::text[]); + get_shard_id_for_distribution_column +-------------------------------------- + 540011 +(1 row) + +-- verify result of the get_shard_id_for_distribution_column +\c - - - :worker_1_port +SELECT * FROM get_shardid_test_table2_540013; + column1 | column2 +---------+--------- + {a,b,c} | 1 +(1 row) + +SELECT * FROM get_shardid_test_table2_540011; + column1 | column2 +---------+--------- + {d,e,f} | 2 +(1 row) + +\c - - - :master_port +-- test mismatching data type +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a'::text); +ERROR: invalid distribution value type +DETAIL: Type of the value does not match the type of the distribution column. Expected type id: 1009, given type id: 25 +-- test NULL distribution column value for hash distributed table +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2'); +ERROR: distribution value cannot be NULL for tables other than reference tables. +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', NULL); +ERROR: distribution value cannot be NULL for tables other than reference tables. +-- test non-distributed table +CREATE TABLE get_shardid_test_table3(column1 int, column2 int); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1); +ERROR: relation is not distributed +-- test append distributed table +SELECT create_distributed_table('get_shardid_test_table3', 'column1', 'append'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1); +ERROR: finding shard id of given distribution value is only supported for hash partitioned tables, range partitioned tables and reference tables. +-- test reference table; +CREATE TABLE get_shardid_test_table4(column1 int, column2 int); +SELECT create_reference_table('get_shardid_test_table4'); + create_reference_table +------------------------ + +(1 row) + +-- test NULL distribution column value for reference table +SELECT get_shard_id_for_distribution_column('get_shardid_test_table4'); + get_shard_id_for_distribution_column +-------------------------------------- + 540014 +(1 row) + +SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', NULL); + get_shard_id_for_distribution_column +-------------------------------------- + 540014 +(1 row) + +-- test different data types for reference table +SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 1); + get_shard_id_for_distribution_column +-------------------------------------- + 540014 +(1 row) + +SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 'a'::text); + get_shard_id_for_distribution_column +-------------------------------------- + 540014 +(1 row) + +SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', '{a, b, c}'::text[]); + get_shard_id_for_distribution_column +-------------------------------------- + 540014 +(1 row) + +-- test range distributed table +CREATE TABLE get_shardid_test_table5(column1 int, column2 int); +SELECT create_distributed_table('get_shardid_test_table5', 'column1', 'range'); + create_distributed_table +-------------------------- + +(1 row) + +-- create worker shards +SELECT master_create_empty_shard('get_shardid_test_table5'); + master_create_empty_shard +--------------------------- + 540015 +(1 row) + +SELECT master_create_empty_shard('get_shardid_test_table5'); + master_create_empty_shard +--------------------------- + 540016 +(1 row) + +SELECT master_create_empty_shard('get_shardid_test_table5'); + master_create_empty_shard +--------------------------- + 540017 +(1 row) + +SELECT master_create_empty_shard('get_shardid_test_table5'); + master_create_empty_shard +--------------------------- + 540018 +(1 row) + +-- now the comparison is done via the partition column type, which is text +UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 1000 WHERE shardid = 540015; +UPDATE pg_dist_shard SET shardminvalue = 1001, shardmaxvalue = 2000 WHERE shardid = 540016; +UPDATE pg_dist_shard SET shardminvalue = 2001, shardmaxvalue = 3000 WHERE shardid = 540017; +UPDATE pg_dist_shard SET shardminvalue = 3001, shardmaxvalue = 4000 WHERE shardid = 540018; +SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 5); + get_shard_id_for_distribution_column +-------------------------------------- + 540015 +(1 row) + +SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 1111); + get_shard_id_for_distribution_column +-------------------------------------- + 540016 +(1 row) + +SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 2689); + get_shard_id_for_distribution_column +-------------------------------------- + 540017 +(1 row) + +SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 3248); + get_shard_id_for_distribution_column +-------------------------------------- + 540018 +(1 row) + +-- test non-existing value for range distributed tables +SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 4001); + get_shard_id_for_distribution_column +-------------------------------------- + 0 +(1 row) + +SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', -999); + get_shard_id_for_distribution_column +-------------------------------------- + 0 +(1 row) + +-- clear unnecessary tables; +DROP TABLE get_shardid_test_table1, get_shardid_test_table2, get_shardid_test_table3, get_shardid_test_table4, get_shardid_test_table5; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index e7c814ef0..54a8777ce 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -64,6 +64,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-3'; ALTER EXTENSION citus UPDATE TO '6.1-4'; ALTER EXTENSION citus UPDATE TO '6.1-5'; ALTER EXTENSION citus UPDATE TO '6.1-6'; +ALTER EXTENSION citus UPDATE TO '6.1-7'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) FROM pg_depend AS pgd, diff --git a/src/test/regress/sql/multi_distribution_metadata.sql b/src/test/regress/sql/multi_distribution_metadata.sql index 41ad310e7..90b642bb7 100644 --- a/src/test/regress/sql/multi_distribution_metadata.sql +++ b/src/test/regress/sql/multi_distribution_metadata.sql @@ -202,3 +202,99 @@ COMMIT; -- lock should be gone now SELECT COUNT(*) FROM pg_locks WHERE locktype = 'advisory' AND objid = 5; + +-- test get_shard_id_for_distribution_column +SET citus.shard_count TO 4; +CREATE TABLE get_shardid_test_table1(column1 int, column2 int); +SELECT create_distributed_table('get_shardid_test_table1', 'column1'); +\COPY get_shardid_test_table1 FROM STDIN with delimiter '|'; +1|1 +2|2 +3|3 +\. +SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 1); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 2); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 3); + +-- verify result of the get_shard_id_for_distribution_column +\c - - - :worker_1_port +SELECT * FROM get_shardid_test_table1_540006; +SELECT * FROM get_shardid_test_table1_540009; +SELECT * FROM get_shardid_test_table1_540007; +\c - - - :master_port + +-- test non-existing value +SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 4); + +-- test array type +SET citus.shard_count TO 4; +CREATE TABLE get_shardid_test_table2(column1 text[], column2 int); +SELECT create_distributed_table('get_shardid_test_table2', 'column1'); +\COPY get_shardid_test_table2 FROM STDIN with delimiter '|'; +{a, b, c}|1 +{d, e, f}|2 +\. +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{a, b, c}'::text[]); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{d, e, f}'::text[]); + +-- verify result of the get_shard_id_for_distribution_column +\c - - - :worker_1_port +SELECT * FROM get_shardid_test_table2_540013; +SELECT * FROM get_shardid_test_table2_540011; +\c - - - :master_port + +-- test mismatching data type +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a'::text); + +-- test NULL distribution column value for hash distributed table +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2'); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', NULL); + +-- test non-distributed table +CREATE TABLE get_shardid_test_table3(column1 int, column2 int); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1); + +-- test append distributed table +SELECT create_distributed_table('get_shardid_test_table3', 'column1', 'append'); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1); + +-- test reference table; +CREATE TABLE get_shardid_test_table4(column1 int, column2 int); +SELECT create_reference_table('get_shardid_test_table4'); + +-- test NULL distribution column value for reference table +SELECT get_shard_id_for_distribution_column('get_shardid_test_table4'); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', NULL); + +-- test different data types for reference table +SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 1); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 'a'::text); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', '{a, b, c}'::text[]); + +-- test range distributed table +CREATE TABLE get_shardid_test_table5(column1 int, column2 int); +SELECT create_distributed_table('get_shardid_test_table5', 'column1', 'range'); + +-- create worker shards +SELECT master_create_empty_shard('get_shardid_test_table5'); +SELECT master_create_empty_shard('get_shardid_test_table5'); +SELECT master_create_empty_shard('get_shardid_test_table5'); +SELECT master_create_empty_shard('get_shardid_test_table5'); + +-- now the comparison is done via the partition column type, which is text +UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 1000 WHERE shardid = 540015; +UPDATE pg_dist_shard SET shardminvalue = 1001, shardmaxvalue = 2000 WHERE shardid = 540016; +UPDATE pg_dist_shard SET shardminvalue = 2001, shardmaxvalue = 3000 WHERE shardid = 540017; +UPDATE pg_dist_shard SET shardminvalue = 3001, shardmaxvalue = 4000 WHERE shardid = 540018; + +SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 5); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 1111); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 2689); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 3248); + +-- test non-existing value for range distributed tables +SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 4001); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', -999); + +-- clear unnecessary tables; +DROP TABLE get_shardid_test_table1, get_shardid_test_table2, get_shardid_test_table3, get_shardid_test_table4, get_shardid_test_table5; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 46c167837..8dc7a8717 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -64,6 +64,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-3'; ALTER EXTENSION citus UPDATE TO '6.1-4'; ALTER EXTENSION citus UPDATE TO '6.1-5'; ALTER EXTENSION citus UPDATE TO '6.1-6'; +ALTER EXTENSION citus UPDATE TO '6.1-7'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*)