diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index 5cbe6130d..566c27115 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -50,10 +50,6 @@ #include "utils/palloc.h" -/* local function forward declarations */ -static text * IntegerToText(int32 value); - - /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(master_create_worker_shards); @@ -418,7 +414,7 @@ CheckHashPartitionedTable(Oid distributedTableId) /* Helper function to convert an integer value to a text type */ -static text * +text * IntegerToText(int32 value) { text *valueText = NULL; diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index e3e6854fa..52a5c51ba 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -45,7 +45,6 @@ static void RepairShardPlacement(int64 shardId, char *sourceNodeName, static void EnsureShardCanBeRepaired(int64 shardId, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort); -static char * ConstructQualifiedShardName(ShardInterval *shardInterval); static List * RecreateTableDDLCommandList(Oid relationId); /* declarations for dynamic loading */ @@ -350,10 +349,8 @@ CopyShardForeignConstraintCommandList(ShardInterval *shardInterval) /* * ConstuctQualifiedShardName creates the fully qualified name string of the * given shard in ._ format. - * - * FIXME: Copied from Citus-MX, should be removed once those changes checked-in to Citus. */ -static char * +char * ConstructQualifiedShardName(ShardInterval *shardInterval) { Oid schemaId = get_rel_namespace(shardInterval->relationId); diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 6b6b846ef..78094bd28 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -95,8 +95,6 @@ static Task * RouterModifyTask(Query *originalQuery, Query *query); static ShardInterval * TargetShardIntervalForModify(Query *query); static List * QueryRestrictList(Query *query); static bool FastShardPruningPossible(CmdType commandType, char partitionMethod); -static ShardInterval * FastShardPruning(Oid distributedTableId, - Const *partionColumnValue); static Const * ExtractInsertPartitionValue(Query *query, Var *partitionColumn); static Task * RouterSelectTask(Query *originalQuery, RelationRestrictionContext *restrictionContext, @@ -1890,7 +1888,8 @@ TargetShardIntervalForModify(Query *query) { uint32 rangeTableId = 1; Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId); - Const *partitionValue = ExtractInsertPartitionValue(query, partitionColumn); + Const *partitionValueConst = ExtractInsertPartitionValue(query, partitionColumn); + Datum partitionValue = partitionValueConst->constvalue; ShardInterval *shardInterval = FastShardPruning(distributedTableId, partitionValue); @@ -2001,8 +2000,8 @@ FastShardPruningPossible(CmdType commandType, char partitionMethod) * the corresponding shard interval that the partitionValue should be in. FastShardPruning * returns NULL if no ShardIntervals exist for the given partitionValue. */ -static ShardInterval * -FastShardPruning(Oid distributedTableId, Const *partitionValue) +ShardInterval * +FastShardPruning(Oid distributedTableId, Datum partitionValue) { DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); int shardCount = cacheEntry->shardIntervalArrayLength; @@ -2030,9 +2029,8 @@ FastShardPruning(Oid distributedTableId, Const *partitionValue) * Call FindShardInterval to find the corresponding shard interval for the * given partition value. */ - shardInterval = FindShardInterval(partitionValue->constvalue, - sortedShardIntervalArray, shardCount, - partitionMethod, + shardInterval = FindShardInterval(partitionValue, sortedShardIntervalArray, + shardCount, partitionMethod, shardIntervalCompareFunction, hashFunction, useBinarySearch); diff --git a/src/backend/distributed/test/connection_cache.c b/src/backend/distributed/test/connection_cache.c index e88204c5a..525abfc50 100644 --- a/src/backend/distributed/test/connection_cache.c +++ b/src/backend/distributed/test/connection_cache.c @@ -28,10 +28,6 @@ #include "utils/lsyscache.h" -/* local function forward declarations */ -static Datum ExtractIntegerDatum(char *input); - - /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(initialize_remote_temp_table); PG_FUNCTION_INFO_V1(count_remote_temp_table_rows); @@ -98,7 +94,7 @@ count_remote_temp_table_rows(PG_FUNCTION_ARGS) else { char *countText = PQgetvalue(result, 0, 0); - count = ExtractIntegerDatum(countText); + count = StringToDatum(countText, INT4OID); } PQclear(result); @@ -189,24 +185,3 @@ set_connection_status_bad(PG_FUNCTION_ARGS) PG_RETURN_BOOL(true); } - - -/* - * ExtractIntegerDatum transforms an integer in textual form into a Datum. - */ -static Datum -ExtractIntegerDatum(char *input) -{ - Oid typIoFunc = InvalidOid; - Oid typIoParam = InvalidOid; - Datum intDatum = 0; - FmgrInfo fmgrInfo; - memset(&fmgrInfo, 0, sizeof(fmgrInfo)); - - getTypeInputInfo(INT4OID, &typIoFunc, &typIoParam); - fmgr_info(typIoFunc, &fmgrInfo); - - intDatum = InputFunctionCall(&fmgrInfo, input, typIoFunc, -1); - - return intDatum; -} diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index f8bb96931..169510dc4 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -30,6 +30,7 @@ #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" #include "distributed/multi_join_order.h" +#include "distributed/multi_router_planner.h" #include "distributed/pg_dist_node.h" #include "distributed/reference_table_utils.h" #include "distributed/shardinterval_utils.h" @@ -40,6 +41,7 @@ #include "storage/fd.h" #include "utils/builtins.h" #include "utils/fmgroids.h" +#include "utils/lsyscache.h" #include "utils/rel.h" #include "utils/relcache.h" @@ -179,20 +181,9 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS) Datum get_shard_id_for_distribution_column(PG_FUNCTION_ARGS) { - Oid relationId = InvalidOid; - Datum distributionValue = 0; - - Var *distributionColumn = NULL; - char distributionMethod = 0; - Oid expectedElementType = InvalidOid; - Oid inputElementType = InvalidOid; - DistTableCacheEntry *cacheEntry = NULL; - int shardCount = 0; - ShardInterval **shardIntervalArray = NULL; - FmgrInfo *hashFunction = NULL; - FmgrInfo *compareFunction = NULL; - bool useBinarySearch = true; ShardInterval *shardInterval = NULL; + char distributionMethod = 0; + Oid relationId = InvalidOid; /* * To have optional parameter as NULL, we defined this UDF as not strict, therefore @@ -227,6 +218,13 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS) else if (distributionMethod == DISTRIBUTE_BY_HASH || distributionMethod == DISTRIBUTE_BY_RANGE) { + Var *distributionColumn = NULL; + Oid distributionDataType = InvalidOid; + Oid inputDataType = InvalidOid; + char *distributionValueString = NULL; + Datum inputDatum = 0; + Datum distributionValueDatum = 0; + /* if given table is not reference table, distributionValue cannot be NULL */ if (PG_ARGISNULL(1)) { @@ -235,36 +233,17 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS) "than reference tables."))); } - distributionValue = PG_GETARG_DATUM(1); + inputDatum = PG_GETARG_DATUM(1); + inputDataType = get_fn_expr_argtype(fcinfo->flinfo, 1); + distributionValueString = DatumToString(inputDatum, inputDataType); distributionColumn = PartitionKey(relationId); - expectedElementType = distributionColumn->vartype; - inputElementType = get_fn_expr_argtype(fcinfo->flinfo, 1); - if (expectedElementType != inputElementType) - { - ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("invalid distribution value type"), - errdetail("Type of the value does not match the type of the " - "distribution column. Expected type id: %d, given " - "type id: %d", expectedElementType, - inputElementType))); - } + distributionDataType = distributionColumn->vartype; - cacheEntry = DistributedTableCacheEntry(relationId); + distributionValueDatum = StringToDatum(distributionValueString, + distributionDataType); - if (distributionMethod == DISTRIBUTE_BY_HASH && - cacheEntry->hasUniformHashDistribution) - { - useBinarySearch = false; - } - - shardCount = cacheEntry->shardIntervalArrayLength; - shardIntervalArray = cacheEntry->sortedShardIntervalArray; - hashFunction = cacheEntry->hashFunction; - compareFunction = cacheEntry->shardIntervalCompareFunction; - shardInterval = FindShardInterval(distributionValue, shardIntervalArray, - shardCount, distributionMethod, compareFunction, - hashFunction, useBinarySearch); + shardInterval = FastShardPruning(relationId, distributionValueDatum); } else { @@ -940,3 +919,40 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple) return workerNode; } + + +/* + * StringToDatum transforms a string representation into a Datum. + */ +Datum +StringToDatum(char *inputString, Oid dataType) +{ + Oid typIoFunc = InvalidOid; + Oid typIoParam = InvalidOid; + int32 typeModifier = -1; + Datum datum = 0; + + getTypeInputInfo(dataType, &typIoFunc, &typIoParam); + getBaseTypeAndTypmod(dataType, &typeModifier); + + datum = OidInputFunctionCall(typIoFunc, inputString, typIoParam, typeModifier); + + return datum; +} + + +/* + * DatumToString returns the string representation of the given datum. + */ +char * +DatumToString(Datum datum, Oid dataType) +{ + char *outputString = NULL; + Oid typIoFunc = InvalidOid; + bool typIsVarlena = false; + + getTypeOutputInfo(dataType, &typIoFunc, &typIsVarlena); + outputString = OidOutputFunctionCall(typIoFunc, datum); + + return outputString; +} diff --git a/src/include/distributed/connection_cache.h b/src/include/distributed/connection_cache.h index 9e24c8570..96aebf045 100644 --- a/src/include/distributed/connection_cache.h +++ b/src/include/distributed/connection_cache.h @@ -46,4 +46,6 @@ extern void WarnRemoteError(PGconn *connection, PGresult *result); extern void ReraiseRemoteError(PGconn *connection, PGresult *result); extern PGconn * ConnectToNode(char *nodeName, int nodePort, char *nodeUser); extern char * ConnectionGetOptionValue(PGconn *connection, char *optionKeyword); + + #endif /* CONNECTION_CACHE_H */ diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 8198e677c..74a9f10b5 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -98,5 +98,9 @@ extern void EnsureTablePermissions(Oid relationId, AclMode mode); extern void EnsureTableOwner(Oid relationId); extern void EnsureSuperUser(void); extern bool TableReferenced(Oid relationId); +extern char * ConstructQualifiedShardName(ShardInterval *shardInterval); +extern Datum StringToDatum(char *inputString, Oid dataType); +extern char * DatumToString(Datum datum, Oid dataType); + #endif /* MASTER_METADATA_UTILITY_H */ diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 72a2baebf..edd0ea653 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -20,6 +20,7 @@ #include "distributed/shardinterval_utils.h" #include "nodes/pg_list.h" +#include "distributed/master_metadata_utility.h" /* @@ -118,6 +119,7 @@ extern Oid ForeignConstraintGetReferencedTableId(char *queryString); extern void CheckHashPartitionedTable(Oid distributedTableId); extern void CheckTableSchemaNameForDrop(Oid relationId, char **schemaName, char **tableName); +extern text * IntegerToText(int32 value); /* Function declarations for generating metadata for shard and placement creation */ extern Datum master_get_table_metadata(PG_FUNCTION_ARGS); @@ -139,6 +141,7 @@ extern Datum master_drop_all_shards(PG_FUNCTION_ARGS); /* function declarations for shard creation functionality */ extern Datum master_create_worker_shards(PG_FUNCTION_ARGS); +extern Datum isolate_tenant_to_new_shard(PG_FUNCTION_ARGS); /* function declarations for shard repair functionality */ extern Datum master_copy_shard_placement(PG_FUNCTION_ARGS); diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index a215a48df..9fac89427 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -41,6 +41,7 @@ extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query); extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query); extern void AddShardIntervalRestrictionToSelect(Query *subqery, ShardInterval *shardInterval); +extern ShardInterval * FastShardPruning(Oid distributedTableId, Datum partitionValue); #endif /* MULTI_ROUTER_PLANNER_H */ diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index 6a95e99ca..a8f074be3 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -155,5 +155,8 @@ extern Datum worker_append_table_to_shard(PG_FUNCTION_ARGS); extern Datum worker_foreign_file_path(PG_FUNCTION_ARGS); extern Datum worker_find_block_local_path(PG_FUNCTION_ARGS); +/* Function declaration for calculating hashed value */ +extern Datum worker_hash(PG_FUNCTION_ARGS); + #endif /* WORKER_PROTOCOL_H */ diff --git a/src/test/regress/expected/multi_distribution_metadata.out b/src/test/regress/expected/multi_distribution_metadata.out index aebb04d65..e4e329b40 100644 --- a/src/test/regress/expected/multi_distribution_metadata.out +++ b/src/test/regress/expected/multi_distribution_metadata.out @@ -366,13 +366,13 @@ SELECT create_distributed_table('get_shardid_test_table2', 'column1'); (1 row) \COPY get_shardid_test_table2 FROM STDIN with delimiter '|'; -SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{a, b, c}'::text[]); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{a, b, c}'); get_shard_id_for_distribution_column -------------------------------------- 540013 (1 row) -SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{d, e, f}'::text[]); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{d, e, f}'); get_shard_id_for_distribution_column -------------------------------------- 540011 @@ -394,9 +394,9 @@ SELECT * FROM get_shardid_test_table2_540011; \c - - - :master_port -- test mismatching data type -SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a'::text); -ERROR: invalid distribution value type -DETAIL: Type of the value does not match the type of the distribution column. Expected type id: 1009, given type id: 25 +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a'); +ERROR: malformed array literal: "a" +DETAIL: Array value must start with "{" or dimension information. -- test NULL distribution column value for hash distributed table SELECT get_shard_id_for_distribution_column('get_shardid_test_table2'); ERROR: distribution value cannot be NULL for tables other than reference tables. @@ -443,13 +443,13 @@ SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 1); 540014 (1 row) -SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 'a'::text); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 'a'); get_shard_id_for_distribution_column -------------------------------------- 540014 (1 row) -SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', '{a, b, c}'::text[]); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', '{a, b, c}'); get_shard_id_for_distribution_column -------------------------------------- 540014 diff --git a/src/test/regress/sql/multi_distribution_metadata.sql b/src/test/regress/sql/multi_distribution_metadata.sql index 90b642bb7..02c2be792 100644 --- a/src/test/regress/sql/multi_distribution_metadata.sql +++ b/src/test/regress/sql/multi_distribution_metadata.sql @@ -234,8 +234,8 @@ SELECT create_distributed_table('get_shardid_test_table2', 'column1'); {a, b, c}|1 {d, e, f}|2 \. -SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{a, b, c}'::text[]); -SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{d, e, f}'::text[]); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{a, b, c}'); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{d, e, f}'); -- verify result of the get_shard_id_for_distribution_column \c - - - :worker_1_port @@ -244,7 +244,7 @@ SELECT * FROM get_shardid_test_table2_540011; \c - - - :master_port -- test mismatching data type -SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a'::text); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a'); -- test NULL distribution column value for hash distributed table SELECT get_shard_id_for_distribution_column('get_shardid_test_table2'); @@ -268,8 +268,8 @@ SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', NULL); -- test different data types for reference table SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 1); -SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 'a'::text); -SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', '{a, b, c}'::text[]); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 'a'); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', '{a, b, c}'); -- test range distributed table CREATE TABLE get_shardid_test_table5(column1 int, column2 int);