mirror of https://github.com/citusdata/citus.git
Address reviews - Part 1
-Add support for range distributed tables and reference tables also -Make the UDF non-strict, add NULL checks -Return NULL if shard cannot be found(possible for range distributed tables) -Used any pseudo type instead of any element -Add permission check for relation -Use hashing to search the shard where appropriate -Improved error messagespull/1048/head
parent
b7972ed056
commit
e1f469e92b
|
@ -155,8 +155,8 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS)
|
||||||
Datum
|
Datum
|
||||||
get_shard_id_for_distribution_column(PG_FUNCTION_ARGS)
|
get_shard_id_for_distribution_column(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
Oid relationId = PG_GETARG_OID(0);
|
Oid relationId = InvalidOid;
|
||||||
Datum distributionValue = PG_GETARG_DATUM(1);
|
Datum distributionValue = 0;
|
||||||
|
|
||||||
Var *distributionColumn = NULL;
|
Var *distributionColumn = NULL;
|
||||||
char distributionMethod = 0;
|
char distributionMethod = 0;
|
||||||
|
@ -170,6 +170,19 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS)
|
||||||
bool useBinarySearch = true;
|
bool useBinarySearch = true;
|
||||||
ShardInterval *shardInterval = NULL;
|
ShardInterval *shardInterval = NULL;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* To have optional parameter as NULL, we defined this UDF as not strict, therefore
|
||||||
|
* we need to check all parameters for NULL values.
|
||||||
|
*/
|
||||||
|
if (PG_ARGISNULL(0))
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
|
||||||
|
errmsg("relation cannot be NULL")));
|
||||||
|
}
|
||||||
|
|
||||||
|
relationId = PG_GETARG_OID(0);
|
||||||
|
EnsureTablePermissions(relationId, ACL_SELECT);
|
||||||
|
|
||||||
if (!IsDistributedTable(relationId))
|
if (!IsDistributedTable(relationId))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
|
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
|
||||||
|
@ -177,34 +190,66 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
distributionMethod = PartitionMethod(relationId);
|
distributionMethod = PartitionMethod(relationId);
|
||||||
if (distributionMethod != DISTRIBUTE_BY_HASH)
|
if (distributionMethod == DISTRIBUTE_BY_NONE)
|
||||||
|
{
|
||||||
|
shardInterval = (ShardInterval *) linitial(LoadShardIntervalList(relationId));
|
||||||
|
}
|
||||||
|
else if (distributionMethod == DISTRIBUTE_BY_HASH ||
|
||||||
|
distributionMethod == DISTRIBUTE_BY_RANGE)
|
||||||
|
{
|
||||||
|
/* if given table is not reference table, distributionValue cannot be NULL */
|
||||||
|
if (PG_ARGISNULL(1))
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
|
||||||
|
errmsg("distribution value cannot be NULL for tables other "
|
||||||
|
"than reference tables.")));
|
||||||
|
}
|
||||||
|
|
||||||
|
distributionValue = PG_GETARG_DATUM(1);
|
||||||
|
|
||||||
|
distributionColumn = PartitionKey(relationId);
|
||||||
|
expectedElementType = distributionColumn->vartype;
|
||||||
|
inputElementType = get_fn_expr_argtype(fcinfo->flinfo, 1);
|
||||||
|
if (expectedElementType != inputElementType)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||||
|
errmsg("invalid distribution value type"),
|
||||||
|
errdetail("Type of the value does not match the type of the "
|
||||||
|
"distribution column. Expected type id: %d, given "
|
||||||
|
"type id: %d", expectedElementType,
|
||||||
|
inputElementType)));
|
||||||
|
}
|
||||||
|
|
||||||
|
cacheEntry = DistributedTableCacheEntry(relationId);
|
||||||
|
|
||||||
|
if (distributionMethod == DISTRIBUTE_BY_HASH &&
|
||||||
|
cacheEntry->hasUniformHashDistribution)
|
||||||
|
{
|
||||||
|
useBinarySearch = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
shardCount = cacheEntry->shardIntervalArrayLength;
|
||||||
|
shardIntervalArray = cacheEntry->sortedShardIntervalArray;
|
||||||
|
hashFunction = cacheEntry->hashFunction;
|
||||||
|
compareFunction = cacheEntry->shardIntervalCompareFunction;
|
||||||
|
shardInterval = FindShardInterval(distributionValue, shardIntervalArray,
|
||||||
|
shardCount, distributionMethod, compareFunction,
|
||||||
|
hashFunction, useBinarySearch);
|
||||||
|
}
|
||||||
|
else
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
errmsg("finding shard id of given distribution value is not "
|
errmsg("finding shard id of given distribution value is only "
|
||||||
"supported for non-hash partitioned tables")));
|
"supported for hash partitioned tables, range partitioned "
|
||||||
|
"tables and reference tables.")));
|
||||||
}
|
}
|
||||||
|
|
||||||
distributionColumn = PartitionKey(relationId);
|
if (shardInterval != NULL)
|
||||||
expectedElementType = distributionColumn->vartype;
|
|
||||||
inputElementType = get_fn_expr_argtype(fcinfo->flinfo, 1);
|
|
||||||
if (expectedElementType != inputElementType)
|
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
PG_RETURN_INT64(shardInterval->shardId);
|
||||||
errmsg("invalid distribution value type"),
|
|
||||||
errdetail("Type of the value does not match the type of the "
|
|
||||||
"distribution column.")));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cacheEntry = DistributedTableCacheEntry(relationId);
|
PG_RETURN_INT64(NULL);
|
||||||
shardCount = cacheEntry->shardIntervalArrayLength;
|
|
||||||
shardIntervalArray = cacheEntry->sortedShardIntervalArray;
|
|
||||||
hashFunction = cacheEntry->hashFunction;
|
|
||||||
compareFunction = cacheEntry->shardIntervalCompareFunction;
|
|
||||||
shardInterval = FindShardInterval(distributionValue, shardIntervalArray, shardCount,
|
|
||||||
distributionMethod, compareFunction, hashFunction,
|
|
||||||
useBinarySearch);
|
|
||||||
|
|
||||||
PG_RETURN_INT64(shardInterval->shardId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -396,12 +396,17 @@ SELECT * FROM get_shardid_test_table2_540011;
|
||||||
-- test mismatching data type
|
-- test mismatching data type
|
||||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a'::text);
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a'::text);
|
||||||
ERROR: invalid distribution value type
|
ERROR: invalid distribution value type
|
||||||
DETAIL: Type of the value does not match the type of the distribution column.
|
DETAIL: Type of the value does not match the type of the distribution column. Expected type id: 1009, given type id: 25
|
||||||
|
-- test NULL distribution column value for hash distributed table
|
||||||
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2');
|
||||||
|
ERROR: distribution value cannot be NULL for tables other than reference tables.
|
||||||
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', NULL);
|
||||||
|
ERROR: distribution value cannot be NULL for tables other than reference tables.
|
||||||
-- test non-distributed table
|
-- test non-distributed table
|
||||||
CREATE TABLE get_shardid_test_table3(column1 int, column2 int);
|
CREATE TABLE get_shardid_test_table3(column1 int, column2 int);
|
||||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1);
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1);
|
||||||
ERROR: relation is not distributed
|
ERROR: relation is not distributed
|
||||||
-- test non-hash distributed table
|
-- test append distributed table
|
||||||
SELECT create_distributed_table('get_shardid_test_table3', 'column1', 'append');
|
SELECT create_distributed_table('get_shardid_test_table3', 'column1', 'append');
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
--------------------------
|
--------------------------
|
||||||
|
@ -409,6 +414,121 @@ SELECT create_distributed_table('get_shardid_test_table3', 'column1', 'append');
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1);
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1);
|
||||||
ERROR: finding shard id of given distribution value is not supported for non-hash partitioned tables
|
ERROR: finding shard id of given distribution value is only supported for hash partitioned tables, range partitioned tables and reference tables.
|
||||||
|
-- test reference table;
|
||||||
|
CREATE TABLE get_shardid_test_table4(column1 int, column2 int);
|
||||||
|
SELECT create_reference_table('get_shardid_test_table4');
|
||||||
|
create_reference_table
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- test NULL distribution column value for reference table
|
||||||
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4');
|
||||||
|
get_shard_id_for_distribution_column
|
||||||
|
--------------------------------------
|
||||||
|
540014
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', NULL);
|
||||||
|
get_shard_id_for_distribution_column
|
||||||
|
--------------------------------------
|
||||||
|
540014
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- test different data types for reference table
|
||||||
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 1);
|
||||||
|
get_shard_id_for_distribution_column
|
||||||
|
--------------------------------------
|
||||||
|
540014
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 'a'::text);
|
||||||
|
get_shard_id_for_distribution_column
|
||||||
|
--------------------------------------
|
||||||
|
540014
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', '{a, b, c}'::text[]);
|
||||||
|
get_shard_id_for_distribution_column
|
||||||
|
--------------------------------------
|
||||||
|
540014
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- test range distributed table
|
||||||
|
CREATE TABLE get_shardid_test_table5(column1 int, column2 int);
|
||||||
|
SELECT create_distributed_table('get_shardid_test_table5', 'column1', 'range');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- create worker shards
|
||||||
|
SELECT master_create_empty_shard('get_shardid_test_table5');
|
||||||
|
master_create_empty_shard
|
||||||
|
---------------------------
|
||||||
|
540015
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('get_shardid_test_table5');
|
||||||
|
master_create_empty_shard
|
||||||
|
---------------------------
|
||||||
|
540016
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('get_shardid_test_table5');
|
||||||
|
master_create_empty_shard
|
||||||
|
---------------------------
|
||||||
|
540017
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('get_shardid_test_table5');
|
||||||
|
master_create_empty_shard
|
||||||
|
---------------------------
|
||||||
|
540018
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- now the comparison is done via the partition column type, which is text
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 1000 WHERE shardid = 540015;
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 1001, shardmaxvalue = 2000 WHERE shardid = 540016;
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 2001, shardmaxvalue = 3000 WHERE shardid = 540017;
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 3001, shardmaxvalue = 4000 WHERE shardid = 540018;
|
||||||
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 5);
|
||||||
|
get_shard_id_for_distribution_column
|
||||||
|
--------------------------------------
|
||||||
|
540015
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 1111);
|
||||||
|
get_shard_id_for_distribution_column
|
||||||
|
--------------------------------------
|
||||||
|
540016
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 2689);
|
||||||
|
get_shard_id_for_distribution_column
|
||||||
|
--------------------------------------
|
||||||
|
540017
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 3248);
|
||||||
|
get_shard_id_for_distribution_column
|
||||||
|
--------------------------------------
|
||||||
|
540018
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- test non-existing value for range distributed tables
|
||||||
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 4001);
|
||||||
|
get_shard_id_for_distribution_column
|
||||||
|
--------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', -999);
|
||||||
|
get_shard_id_for_distribution_column
|
||||||
|
--------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- clear unnecessary tables;
|
-- clear unnecessary tables;
|
||||||
DROP TABLE get_shardid_test_table1, get_shardid_test_table2, get_shardid_test_table3;
|
DROP TABLE get_shardid_test_table1, get_shardid_test_table2, get_shardid_test_table3, get_shardid_test_table4, get_shardid_test_table5;
|
||||||
|
|
|
@ -246,13 +246,55 @@ SELECT * FROM get_shardid_test_table2_540011;
|
||||||
-- test mismatching data type
|
-- test mismatching data type
|
||||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a'::text);
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a'::text);
|
||||||
|
|
||||||
|
-- test NULL distribution column value for hash distributed table
|
||||||
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2');
|
||||||
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', NULL);
|
||||||
|
|
||||||
-- test non-distributed table
|
-- test non-distributed table
|
||||||
CREATE TABLE get_shardid_test_table3(column1 int, column2 int);
|
CREATE TABLE get_shardid_test_table3(column1 int, column2 int);
|
||||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1);
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1);
|
||||||
|
|
||||||
-- test non-hash distributed table
|
-- test append distributed table
|
||||||
SELECT create_distributed_table('get_shardid_test_table3', 'column1', 'append');
|
SELECT create_distributed_table('get_shardid_test_table3', 'column1', 'append');
|
||||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1);
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table3', 1);
|
||||||
|
|
||||||
|
-- test reference table;
|
||||||
|
CREATE TABLE get_shardid_test_table4(column1 int, column2 int);
|
||||||
|
SELECT create_reference_table('get_shardid_test_table4');
|
||||||
|
|
||||||
|
-- test NULL distribution column value for reference table
|
||||||
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4');
|
||||||
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', NULL);
|
||||||
|
|
||||||
|
-- test different data types for reference table
|
||||||
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 1);
|
||||||
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 'a'::text);
|
||||||
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', '{a, b, c}'::text[]);
|
||||||
|
|
||||||
|
-- test range distributed table
|
||||||
|
CREATE TABLE get_shardid_test_table5(column1 int, column2 int);
|
||||||
|
SELECT create_distributed_table('get_shardid_test_table5', 'column1', 'range');
|
||||||
|
|
||||||
|
-- create worker shards
|
||||||
|
SELECT master_create_empty_shard('get_shardid_test_table5');
|
||||||
|
SELECT master_create_empty_shard('get_shardid_test_table5');
|
||||||
|
SELECT master_create_empty_shard('get_shardid_test_table5');
|
||||||
|
SELECT master_create_empty_shard('get_shardid_test_table5');
|
||||||
|
|
||||||
|
-- now the comparison is done via the partition column type, which is text
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 1000 WHERE shardid = 540015;
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 1001, shardmaxvalue = 2000 WHERE shardid = 540016;
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 2001, shardmaxvalue = 3000 WHERE shardid = 540017;
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 3001, shardmaxvalue = 4000 WHERE shardid = 540018;
|
||||||
|
|
||||||
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 5);
|
||||||
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 1111);
|
||||||
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 2689);
|
||||||
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 3248);
|
||||||
|
|
||||||
|
-- test non-existing value for range distributed tables
|
||||||
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', 4001);
|
||||||
|
SELECT get_shard_id_for_distribution_column('get_shardid_test_table5', -999);
|
||||||
|
|
||||||
-- clear unnecessary tables;
|
-- clear unnecessary tables;
|
||||||
DROP TABLE get_shardid_test_table1, get_shardid_test_table2, get_shardid_test_table3;
|
DROP TABLE get_shardid_test_table1, get_shardid_test_table2, get_shardid_test_table3, get_shardid_test_table4, get_shardid_test_table5;
|
||||||
|
|
Loading…
Reference in New Issue