mirror of https://github.com/citusdata/citus.git
Refactor get_shard_id_for_distribution_column() and other minor changes
parent
ed77260aa1
commit
93e626c896
|
@ -50,10 +50,6 @@
|
|||
#include "utils/palloc.h"
|
||||
|
||||
|
||||
/* local function forward declarations */
|
||||
static text * IntegerToText(int32 value);
|
||||
|
||||
|
||||
/* declarations for dynamic loading */
|
||||
PG_FUNCTION_INFO_V1(master_create_worker_shards);
|
||||
|
||||
|
@ -418,7 +414,7 @@ CheckHashPartitionedTable(Oid distributedTableId)
|
|||
|
||||
|
||||
/* Helper function to convert an integer value to a text type */
|
||||
static text *
|
||||
text *
|
||||
IntegerToText(int32 value)
|
||||
{
|
||||
text *valueText = NULL;
|
||||
|
|
|
@ -45,7 +45,6 @@ static void RepairShardPlacement(int64 shardId, char *sourceNodeName,
|
|||
static void EnsureShardCanBeRepaired(int64 shardId, char *sourceNodeName,
|
||||
int32 sourceNodePort, char *targetNodeName,
|
||||
int32 targetNodePort);
|
||||
static char * ConstructQualifiedShardName(ShardInterval *shardInterval);
|
||||
static List * RecreateTableDDLCommandList(Oid relationId);
|
||||
|
||||
/* declarations for dynamic loading */
|
||||
|
@ -350,10 +349,8 @@ CopyShardForeignConstraintCommandList(ShardInterval *shardInterval)
|
|||
/*
|
||||
* ConstuctQualifiedShardName creates the fully qualified name string of the
|
||||
* given shard in <schema>.<table_name>_<shard_id> format.
|
||||
*
|
||||
* FIXME: Copied from Citus-MX, should be removed once those changes checked-in to Citus.
|
||||
*/
|
||||
static char *
|
||||
char *
|
||||
ConstructQualifiedShardName(ShardInterval *shardInterval)
|
||||
{
|
||||
Oid schemaId = get_rel_namespace(shardInterval->relationId);
|
||||
|
|
|
@ -95,8 +95,6 @@ static Task * RouterModifyTask(Query *originalQuery, Query *query);
|
|||
static ShardInterval * TargetShardIntervalForModify(Query *query);
|
||||
static List * QueryRestrictList(Query *query);
|
||||
static bool FastShardPruningPossible(CmdType commandType, char partitionMethod);
|
||||
static ShardInterval * FastShardPruning(Oid distributedTableId,
|
||||
Const *partionColumnValue);
|
||||
static Const * ExtractInsertPartitionValue(Query *query, Var *partitionColumn);
|
||||
static Task * RouterSelectTask(Query *originalQuery,
|
||||
RelationRestrictionContext *restrictionContext,
|
||||
|
@ -1890,7 +1888,8 @@ TargetShardIntervalForModify(Query *query)
|
|||
{
|
||||
uint32 rangeTableId = 1;
|
||||
Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
|
||||
Const *partitionValue = ExtractInsertPartitionValue(query, partitionColumn);
|
||||
Const *partitionValueConst = ExtractInsertPartitionValue(query, partitionColumn);
|
||||
Datum partitionValue = partitionValueConst->constvalue;
|
||||
ShardInterval *shardInterval = FastShardPruning(distributedTableId,
|
||||
partitionValue);
|
||||
|
||||
|
@ -2001,8 +2000,8 @@ FastShardPruningPossible(CmdType commandType, char partitionMethod)
|
|||
* the corresponding shard interval that the partitionValue should be in. FastShardPruning
|
||||
* returns NULL if no ShardIntervals exist for the given partitionValue.
|
||||
*/
|
||||
static ShardInterval *
|
||||
FastShardPruning(Oid distributedTableId, Const *partitionValue)
|
||||
ShardInterval *
|
||||
FastShardPruning(Oid distributedTableId, Datum partitionValue)
|
||||
{
|
||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
||||
int shardCount = cacheEntry->shardIntervalArrayLength;
|
||||
|
@ -2030,9 +2029,8 @@ FastShardPruning(Oid distributedTableId, Const *partitionValue)
|
|||
* Call FindShardInterval to find the corresponding shard interval for the
|
||||
* given partition value.
|
||||
*/
|
||||
shardInterval = FindShardInterval(partitionValue->constvalue,
|
||||
sortedShardIntervalArray, shardCount,
|
||||
partitionMethod,
|
||||
shardInterval = FindShardInterval(partitionValue, sortedShardIntervalArray,
|
||||
shardCount, partitionMethod,
|
||||
shardIntervalCompareFunction, hashFunction,
|
||||
useBinarySearch);
|
||||
|
||||
|
|
|
@ -28,10 +28,6 @@
|
|||
#include "utils/lsyscache.h"
|
||||
|
||||
|
||||
/* local function forward declarations */
|
||||
static Datum ExtractIntegerDatum(char *input);
|
||||
|
||||
|
||||
/* declarations for dynamic loading */
|
||||
PG_FUNCTION_INFO_V1(initialize_remote_temp_table);
|
||||
PG_FUNCTION_INFO_V1(count_remote_temp_table_rows);
|
||||
|
@ -98,7 +94,7 @@ count_remote_temp_table_rows(PG_FUNCTION_ARGS)
|
|||
else
|
||||
{
|
||||
char *countText = PQgetvalue(result, 0, 0);
|
||||
count = ExtractIntegerDatum(countText);
|
||||
count = StringToDatum(countText, INT4OID);
|
||||
}
|
||||
|
||||
PQclear(result);
|
||||
|
@ -189,24 +185,3 @@ set_connection_status_bad(PG_FUNCTION_ARGS)
|
|||
|
||||
PG_RETURN_BOOL(true);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExtractIntegerDatum transforms an integer in textual form into a Datum.
|
||||
*/
|
||||
static Datum
|
||||
ExtractIntegerDatum(char *input)
|
||||
{
|
||||
Oid typIoFunc = InvalidOid;
|
||||
Oid typIoParam = InvalidOid;
|
||||
Datum intDatum = 0;
|
||||
FmgrInfo fmgrInfo;
|
||||
memset(&fmgrInfo, 0, sizeof(fmgrInfo));
|
||||
|
||||
getTypeInputInfo(INT4OID, &typIoFunc, &typIoParam);
|
||||
fmgr_info(typIoFunc, &fmgrInfo);
|
||||
|
||||
intDatum = InputFunctionCall(&fmgrInfo, input, typIoFunc, -1);
|
||||
|
||||
return intDatum;
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@
|
|||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/multi_join_order.h"
|
||||
#include "distributed/multi_router_planner.h"
|
||||
#include "distributed/pg_dist_node.h"
|
||||
#include "distributed/reference_table_utils.h"
|
||||
#include "distributed/shardinterval_utils.h"
|
||||
|
@ -40,6 +41,7 @@
|
|||
#include "storage/fd.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/fmgroids.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/rel.h"
|
||||
#include "utils/relcache.h"
|
||||
|
||||
|
@ -179,20 +181,9 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS)
|
|||
Datum
|
||||
get_shard_id_for_distribution_column(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid relationId = InvalidOid;
|
||||
Datum distributionValue = 0;
|
||||
|
||||
Var *distributionColumn = NULL;
|
||||
char distributionMethod = 0;
|
||||
Oid expectedElementType = InvalidOid;
|
||||
Oid inputElementType = InvalidOid;
|
||||
DistTableCacheEntry *cacheEntry = NULL;
|
||||
int shardCount = 0;
|
||||
ShardInterval **shardIntervalArray = NULL;
|
||||
FmgrInfo *hashFunction = NULL;
|
||||
FmgrInfo *compareFunction = NULL;
|
||||
bool useBinarySearch = true;
|
||||
ShardInterval *shardInterval = NULL;
|
||||
char distributionMethod = 0;
|
||||
Oid relationId = InvalidOid;
|
||||
|
||||
/*
|
||||
* To have optional parameter as NULL, we defined this UDF as not strict, therefore
|
||||
|
@ -227,6 +218,13 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS)
|
|||
else if (distributionMethod == DISTRIBUTE_BY_HASH ||
|
||||
distributionMethod == DISTRIBUTE_BY_RANGE)
|
||||
{
|
||||
Var *distributionColumn = NULL;
|
||||
Oid distributionDataType = InvalidOid;
|
||||
Oid inputDataType = InvalidOid;
|
||||
char *distributionValueString = NULL;
|
||||
Datum inputDatum = 0;
|
||||
Datum distributionValueDatum = 0;
|
||||
|
||||
/* if given table is not reference table, distributionValue cannot be NULL */
|
||||
if (PG_ARGISNULL(1))
|
||||
{
|
||||
|
@ -235,36 +233,17 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS)
|
|||
"than reference tables.")));
|
||||
}
|
||||
|
||||
distributionValue = PG_GETARG_DATUM(1);
|
||||
inputDatum = PG_GETARG_DATUM(1);
|
||||
inputDataType = get_fn_expr_argtype(fcinfo->flinfo, 1);
|
||||
distributionValueString = DatumToString(inputDatum, inputDataType);
|
||||
|
||||
distributionColumn = PartitionKey(relationId);
|
||||
expectedElementType = distributionColumn->vartype;
|
||||
inputElementType = get_fn_expr_argtype(fcinfo->flinfo, 1);
|
||||
if (expectedElementType != inputElementType)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||
errmsg("invalid distribution value type"),
|
||||
errdetail("Type of the value does not match the type of the "
|
||||
"distribution column. Expected type id: %d, given "
|
||||
"type id: %d", expectedElementType,
|
||||
inputElementType)));
|
||||
}
|
||||
distributionDataType = distributionColumn->vartype;
|
||||
|
||||
cacheEntry = DistributedTableCacheEntry(relationId);
|
||||
distributionValueDatum = StringToDatum(distributionValueString,
|
||||
distributionDataType);
|
||||
|
||||
if (distributionMethod == DISTRIBUTE_BY_HASH &&
|
||||
cacheEntry->hasUniformHashDistribution)
|
||||
{
|
||||
useBinarySearch = false;
|
||||
}
|
||||
|
||||
shardCount = cacheEntry->shardIntervalArrayLength;
|
||||
shardIntervalArray = cacheEntry->sortedShardIntervalArray;
|
||||
hashFunction = cacheEntry->hashFunction;
|
||||
compareFunction = cacheEntry->shardIntervalCompareFunction;
|
||||
shardInterval = FindShardInterval(distributionValue, shardIntervalArray,
|
||||
shardCount, distributionMethod, compareFunction,
|
||||
hashFunction, useBinarySearch);
|
||||
shardInterval = FastShardPruning(relationId, distributionValueDatum);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -940,3 +919,40 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple)
|
|||
|
||||
return workerNode;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* StringToDatum transforms a string representation into a Datum.
|
||||
*/
|
||||
Datum
|
||||
StringToDatum(char *inputString, Oid dataType)
|
||||
{
|
||||
Oid typIoFunc = InvalidOid;
|
||||
Oid typIoParam = InvalidOid;
|
||||
int32 typeModifier = -1;
|
||||
Datum datum = 0;
|
||||
|
||||
getTypeInputInfo(dataType, &typIoFunc, &typIoParam);
|
||||
getBaseTypeAndTypmod(dataType, &typeModifier);
|
||||
|
||||
datum = OidInputFunctionCall(typIoFunc, inputString, typIoParam, typeModifier);
|
||||
|
||||
return datum;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DatumToString returns the string representation of the given datum.
|
||||
*/
|
||||
char *
|
||||
DatumToString(Datum datum, Oid dataType)
|
||||
{
|
||||
char *outputString = NULL;
|
||||
Oid typIoFunc = InvalidOid;
|
||||
bool typIsVarlena = false;
|
||||
|
||||
getTypeOutputInfo(dataType, &typIoFunc, &typIsVarlena);
|
||||
outputString = OidOutputFunctionCall(typIoFunc, datum);
|
||||
|
||||
return outputString;
|
||||
}
|
||||
|
|
|
@ -46,4 +46,6 @@ extern void WarnRemoteError(PGconn *connection, PGresult *result);
|
|||
extern void ReraiseRemoteError(PGconn *connection, PGresult *result);
|
||||
extern PGconn * ConnectToNode(char *nodeName, int nodePort, char *nodeUser);
|
||||
extern char * ConnectionGetOptionValue(PGconn *connection, char *optionKeyword);
|
||||
|
||||
|
||||
#endif /* CONNECTION_CACHE_H */
|
||||
|
|
|
@ -98,5 +98,9 @@ extern void EnsureTablePermissions(Oid relationId, AclMode mode);
|
|||
extern void EnsureTableOwner(Oid relationId);
|
||||
extern void EnsureSuperUser(void);
|
||||
extern bool TableReferenced(Oid relationId);
|
||||
extern char * ConstructQualifiedShardName(ShardInterval *shardInterval);
|
||||
extern Datum StringToDatum(char *inputString, Oid dataType);
|
||||
extern char * DatumToString(Datum datum, Oid dataType);
|
||||
|
||||
|
||||
#endif /* MASTER_METADATA_UTILITY_H */
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
#include "distributed/shardinterval_utils.h"
|
||||
#include "nodes/pg_list.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
|
||||
|
||||
/*
|
||||
|
@ -118,6 +119,7 @@ extern Oid ForeignConstraintGetReferencedTableId(char *queryString);
|
|||
extern void CheckHashPartitionedTable(Oid distributedTableId);
|
||||
extern void CheckTableSchemaNameForDrop(Oid relationId, char **schemaName,
|
||||
char **tableName);
|
||||
extern text * IntegerToText(int32 value);
|
||||
|
||||
/* Function declarations for generating metadata for shard and placement creation */
|
||||
extern Datum master_get_table_metadata(PG_FUNCTION_ARGS);
|
||||
|
@ -139,6 +141,7 @@ extern Datum master_drop_all_shards(PG_FUNCTION_ARGS);
|
|||
|
||||
/* function declarations for shard creation functionality */
|
||||
extern Datum master_create_worker_shards(PG_FUNCTION_ARGS);
|
||||
extern Datum isolate_tenant_to_new_shard(PG_FUNCTION_ARGS);
|
||||
|
||||
/* function declarations for shard repair functionality */
|
||||
extern Datum master_copy_shard_placement(PG_FUNCTION_ARGS);
|
||||
|
|
|
@ -41,6 +41,7 @@ extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query);
|
|||
extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query);
|
||||
extern void AddShardIntervalRestrictionToSelect(Query *subqery,
|
||||
ShardInterval *shardInterval);
|
||||
extern ShardInterval * FastShardPruning(Oid distributedTableId, Datum partitionValue);
|
||||
|
||||
|
||||
#endif /* MULTI_ROUTER_PLANNER_H */
|
||||
|
|
|
@ -155,5 +155,8 @@ extern Datum worker_append_table_to_shard(PG_FUNCTION_ARGS);
|
|||
extern Datum worker_foreign_file_path(PG_FUNCTION_ARGS);
|
||||
extern Datum worker_find_block_local_path(PG_FUNCTION_ARGS);
|
||||
|
||||
/* Function declaration for calculating hashed value */
|
||||
extern Datum worker_hash(PG_FUNCTION_ARGS);
|
||||
|
||||
|
||||
#endif /* WORKER_PROTOCOL_H */
|
||||
|
|
|
@ -366,13 +366,13 @@ SELECT create_distributed_table('get_shardid_test_table2', 'column1');
|
|||
(1 row)
|
||||
|
||||
\COPY get_shardid_test_table2 FROM STDIN with delimiter '|';
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{a, b, c}'::text[]);
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{a, b, c}');
|
||||
get_shard_id_for_distribution_column
|
||||
--------------------------------------
|
||||
540013
|
||||
(1 row)
|
||||
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{d, e, f}'::text[]);
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{d, e, f}');
|
||||
get_shard_id_for_distribution_column
|
||||
--------------------------------------
|
||||
540011
|
||||
|
@ -394,9 +394,9 @@ SELECT * FROM get_shardid_test_table2_540011;
|
|||
|
||||
\c - - - :master_port
|
||||
-- test mismatching data type
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a'::text);
|
||||
ERROR: invalid distribution value type
|
||||
DETAIL: Type of the value does not match the type of the distribution column. Expected type id: 1009, given type id: 25
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a');
|
||||
ERROR: malformed array literal: "a"
|
||||
DETAIL: Array value must start with "{" or dimension information.
|
||||
-- test NULL distribution column value for hash distributed table
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2');
|
||||
ERROR: distribution value cannot be NULL for tables other than reference tables.
|
||||
|
@ -443,13 +443,13 @@ SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 1);
|
|||
540014
|
||||
(1 row)
|
||||
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 'a'::text);
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 'a');
|
||||
get_shard_id_for_distribution_column
|
||||
--------------------------------------
|
||||
540014
|
||||
(1 row)
|
||||
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', '{a, b, c}'::text[]);
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', '{a, b, c}');
|
||||
get_shard_id_for_distribution_column
|
||||
--------------------------------------
|
||||
540014
|
||||
|
|
|
@ -234,8 +234,8 @@ SELECT create_distributed_table('get_shardid_test_table2', 'column1');
|
|||
{a, b, c}|1
|
||||
{d, e, f}|2
|
||||
\.
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{a, b, c}'::text[]);
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{d, e, f}'::text[]);
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{a, b, c}');
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{d, e, f}');
|
||||
|
||||
-- verify result of the get_shard_id_for_distribution_column
|
||||
\c - - - :worker_1_port
|
||||
|
@ -244,7 +244,7 @@ SELECT * FROM get_shardid_test_table2_540011;
|
|||
\c - - - :master_port
|
||||
|
||||
-- test mismatching data type
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a'::text);
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a');
|
||||
|
||||
-- test NULL distribution column value for hash distributed table
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2');
|
||||
|
@ -268,8 +268,8 @@ SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', NULL);
|
|||
|
||||
-- test different data types for reference table
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 1);
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 'a'::text);
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', '{a, b, c}'::text[]);
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 'a');
|
||||
SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', '{a, b, c}');
|
||||
|
||||
-- test range distributed table
|
||||
CREATE TABLE get_shardid_test_table5(column1 int, column2 int);
|
||||
|
|
Loading…
Reference in New Issue