diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index d7065afc8..541358ca5 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -9,7 +9,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ - 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 + 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -127,6 +127,8 @@ $(EXTENSION)--6.1-15.sql: $(EXTENSION)--6.1-14.sql $(EXTENSION)--6.1-14--6.1-15. cat $^ > $@ $(EXTENSION)--6.1-16.sql: $(EXTENSION)--6.1-15.sql $(EXTENSION)--6.1-15--6.1-16.sql cat $^ > $@ +$(EXTENSION)--6.1-17.sql: $(EXTENSION)--6.1-16.sql $(EXTENSION)--6.1-16--6.1-17.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.1-16--6.1-17.sql b/src/backend/distributed/citus--6.1-16--6.1-17.sql new file mode 100644 index 000000000..5a4c2d834 --- /dev/null +++ b/src/backend/distributed/citus--6.1-16--6.1-17.sql @@ -0,0 +1,19 @@ +/* citus--6.1-16--6.1-17.sql */ + +SET search_path = 'pg_catalog'; + +CREATE FUNCTION isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text DEFAULT '') + RETURNS bigint + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$isolate_tenant_to_new_shard$$; +COMMENT ON FUNCTION isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text) + IS 'isolate a tenant to its own shard and return the new shard id'; + +CREATE FUNCTION worker_hash(value "any") + RETURNS integer + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$worker_hash$$; +COMMENT ON FUNCTION worker_hash(value "any") + IS 'calculate hashed value and return it'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index f0e1fb685..57d2534ec 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.1-16' +default_version = '6.1-17' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index 5cbe6130d..566c27115 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -50,10 +50,6 @@ #include "utils/palloc.h" -/* local function forward declarations */ -static text * IntegerToText(int32 value); - - /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(master_create_worker_shards); @@ -418,7 +414,7 @@ CheckHashPartitionedTable(Oid distributedTableId) /* Helper function to convert an integer value to a text type */ -static text * +text * IntegerToText(int32 value) { text *valueText = NULL; diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index e3e6854fa..52a5c51ba 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -45,7 +45,6 @@ static void RepairShardPlacement(int64 shardId, char *sourceNodeName, static void EnsureShardCanBeRepaired(int64 shardId, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort); -static char * ConstructQualifiedShardName(ShardInterval *shardInterval); static List * RecreateTableDDLCommandList(Oid relationId); /* declarations for dynamic loading */ @@ -350,10 +349,8 @@ CopyShardForeignConstraintCommandList(ShardInterval *shardInterval) /* * ConstuctQualifiedShardName creates the fully qualified name string of the * given shard in ._ format. - * - * FIXME: Copied from Citus-MX, should be removed once those changes checked-in to Citus. */ -static char * +char * ConstructQualifiedShardName(ShardInterval *shardInterval) { Oid schemaId = get_rel_namespace(shardInterval->relationId); diff --git a/src/backend/distributed/master/master_split_shards.c b/src/backend/distributed/master/master_split_shards.c new file mode 100644 index 000000000..51c6a0543 --- /dev/null +++ b/src/backend/distributed/master/master_split_shards.c @@ -0,0 +1,89 @@ +/*------------------------------------------------------------------------- + * + * master_split_shards.c + * + * This file contains functions to split a shard according to a given + * distribution column value. + * + * Copyright (c) 2014-2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "c.h" +#include "fmgr.h" + +#include "catalog/pg_class.h" +#include "distributed/colocation_utils.h" +#include "distributed/connection_cache.h" +#include "distributed/master_protocol.h" +#include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" +#include "distributed/multi_join_order.h" +#include "distributed/multi_router_planner.h" +#include "distributed/pg_dist_partition.h" +#include "distributed/pg_dist_shard.h" +#include "distributed/remote_commands.h" +#include "distributed/resource_lock.h" +#include "distributed/worker_manager.h" +#include "distributed/worker_protocol.h" +#include "distributed/worker_transaction.h" +#include "nodes/pg_list.h" +#include "storage/lock.h" +#include "utils/builtins.h" +#include "utils/elog.h" +#include "utils/errcodes.h" +#include "utils/lsyscache.h" +#include "utils/typcache.h" + + +/* declarations for dynamic loading */ +PG_FUNCTION_INFO_V1(isolate_tenant_to_new_shard); +PG_FUNCTION_INFO_V1(worker_hash); + + +/* + * isolate_tenant_to_new_shard isolates a tenant to its own shard by spliting + * the current matching shard. + */ +Datum +isolate_tenant_to_new_shard(PG_FUNCTION_ARGS) +{ + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("isolate_tenant_to_new_shard() is only supported on " + "Citus Enterprise"))); +} + + +/* + * worker_hash returns the hashed value of the given value. + */ +Datum +worker_hash(PG_FUNCTION_ARGS) +{ + Datum valueDatum = PG_GETARG_DATUM(0); + Datum hashedValueDatum = 0; + TypeCacheEntry *typeEntry = NULL; + FmgrInfo *hashFunction = NULL; + Oid valueDataType = InvalidOid; + + /* figure out hash function from the data type */ + valueDataType = get_fn_expr_argtype(fcinfo->flinfo, 0); + typeEntry = lookup_type_cache(valueDataType, TYPECACHE_HASH_PROC_FINFO); + + if (typeEntry->hash_proc_finfo.fn_oid == InvalidOid) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot find a hash function for the input type"), + errhint("Cast input to a data type with a hash function."))); + } + + hashFunction = palloc0(sizeof(FmgrInfo)); + fmgr_info_copy(hashFunction, &(typeEntry->hash_proc_finfo), CurrentMemoryContext); + + /* calculate hash value */ + hashedValueDatum = FunctionCall1(hashFunction, valueDatum); + + PG_RETURN_INT32(hashedValueDatum); +} diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index d0bbf5e05..b69ec3139 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -513,7 +513,7 @@ TableOwnerResetCommand(Oid relationId) /* - * ShardListInsertCommand generates a singe command that can be + * ShardListInsertCommand generates a single command that can be * executed to replicate shard and shard placement metadata for the * given shard intervals. The function assumes that each shard has a * single placement, and asserts this information. @@ -634,6 +634,37 @@ ShardListInsertCommand(List *shardIntervalList) } +/* + * ShardListDeleteCommand generates a command list that can be executed to delete + * shard and shard placement metadata for the given shard. + */ +List * +ShardDeleteCommandList(ShardInterval *shardInterval) +{ + uint64 shardId = shardInterval->shardId; + List *commandList = NIL; + StringInfo deletePlacementCommand = NULL; + StringInfo deleteShardCommand = NULL; + + /* create command to delete shard placements */ + deletePlacementCommand = makeStringInfo(); + appendStringInfo(deletePlacementCommand, + "DELETE FROM pg_dist_shard_placement WHERE shardid = %lu", + shardId); + + commandList = lappend(commandList, deletePlacementCommand->data); + + /* create command to delete shard */ + deleteShardCommand = makeStringInfo(); + appendStringInfo(deleteShardCommand, + "DELETE FROM pg_dist_shard WHERE shardid = %lu", shardId); + + commandList = lappend(commandList, deleteShardCommand->data); + + return commandList; +} + + /* * NodeDeleteCommand generate a command that can be * executed to delete the metadata for a worker node. diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 6b6b846ef..78094bd28 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -95,8 +95,6 @@ static Task * RouterModifyTask(Query *originalQuery, Query *query); static ShardInterval * TargetShardIntervalForModify(Query *query); static List * QueryRestrictList(Query *query); static bool FastShardPruningPossible(CmdType commandType, char partitionMethod); -static ShardInterval * FastShardPruning(Oid distributedTableId, - Const *partionColumnValue); static Const * ExtractInsertPartitionValue(Query *query, Var *partitionColumn); static Task * RouterSelectTask(Query *originalQuery, RelationRestrictionContext *restrictionContext, @@ -1890,7 +1888,8 @@ TargetShardIntervalForModify(Query *query) { uint32 rangeTableId = 1; Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId); - Const *partitionValue = ExtractInsertPartitionValue(query, partitionColumn); + Const *partitionValueConst = ExtractInsertPartitionValue(query, partitionColumn); + Datum partitionValue = partitionValueConst->constvalue; ShardInterval *shardInterval = FastShardPruning(distributedTableId, partitionValue); @@ -2001,8 +2000,8 @@ FastShardPruningPossible(CmdType commandType, char partitionMethod) * the corresponding shard interval that the partitionValue should be in. FastShardPruning * returns NULL if no ShardIntervals exist for the given partitionValue. */ -static ShardInterval * -FastShardPruning(Oid distributedTableId, Const *partitionValue) +ShardInterval * +FastShardPruning(Oid distributedTableId, Datum partitionValue) { DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); int shardCount = cacheEntry->shardIntervalArrayLength; @@ -2030,9 +2029,8 @@ FastShardPruning(Oid distributedTableId, Const *partitionValue) * Call FindShardInterval to find the corresponding shard interval for the * given partition value. */ - shardInterval = FindShardInterval(partitionValue->constvalue, - sortedShardIntervalArray, shardCount, - partitionMethod, + shardInterval = FindShardInterval(partitionValue, sortedShardIntervalArray, + shardCount, partitionMethod, shardIntervalCompareFunction, hashFunction, useBinarySearch); diff --git a/src/backend/distributed/test/connection_cache.c b/src/backend/distributed/test/connection_cache.c index e88204c5a..525abfc50 100644 --- a/src/backend/distributed/test/connection_cache.c +++ b/src/backend/distributed/test/connection_cache.c @@ -28,10 +28,6 @@ #include "utils/lsyscache.h" -/* local function forward declarations */ -static Datum ExtractIntegerDatum(char *input); - - /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(initialize_remote_temp_table); PG_FUNCTION_INFO_V1(count_remote_temp_table_rows); @@ -98,7 +94,7 @@ count_remote_temp_table_rows(PG_FUNCTION_ARGS) else { char *countText = PQgetvalue(result, 0, 0); - count = ExtractIntegerDatum(countText); + count = StringToDatum(countText, INT4OID); } PQclear(result); @@ -189,24 +185,3 @@ set_connection_status_bad(PG_FUNCTION_ARGS) PG_RETURN_BOOL(true); } - - -/* - * ExtractIntegerDatum transforms an integer in textual form into a Datum. - */ -static Datum -ExtractIntegerDatum(char *input) -{ - Oid typIoFunc = InvalidOid; - Oid typIoParam = InvalidOid; - Datum intDatum = 0; - FmgrInfo fmgrInfo; - memset(&fmgrInfo, 0, sizeof(fmgrInfo)); - - getTypeInputInfo(INT4OID, &typIoFunc, &typIoParam); - fmgr_info(typIoFunc, &fmgrInfo); - - intDatum = InputFunctionCall(&fmgrInfo, input, typIoFunc, -1); - - return intDatum; -} diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 6df607a89..f2af93772 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -841,7 +841,11 @@ ColocatedShardIntervalList(ShardInterval *shardInterval) if ((partitionMethod == DISTRIBUTE_BY_APPEND) || (partitionMethod == DISTRIBUTE_BY_RANGE)) { - colocatedShardList = lappend(colocatedShardList, shardInterval); + ShardInterval *copyShardInterval = CitusMakeNode(ShardInterval); + CopyShardInterval(shardInterval, copyShardInterval); + + colocatedShardList = lappend(colocatedShardList, copyShardInterval); + return colocatedShardList; } @@ -857,6 +861,7 @@ ColocatedShardIntervalList(ShardInterval *shardInterval) DistTableCacheEntry *colocatedTableCacheEntry = DistributedTableCacheEntry(colocatedTableId); ShardInterval *colocatedShardInterval = NULL; + ShardInterval *copyShardInterval = NULL; /* * Since we iterate over co-located tables, shard count of each table should be @@ -868,7 +873,10 @@ ColocatedShardIntervalList(ShardInterval *shardInterval) colocatedShardInterval = colocatedTableCacheEntry->sortedShardIntervalArray[shardIntervalIndex]; - colocatedShardList = lappend(colocatedShardList, colocatedShardInterval); + copyShardInterval = CitusMakeNode(ShardInterval); + CopyShardInterval(colocatedShardInterval, copyShardInterval); + + colocatedShardList = lappend(colocatedShardList, copyShardInterval); } Assert(list_length(colocatedTableList) == list_length(colocatedShardList)); diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index f8bb96931..169510dc4 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -30,6 +30,7 @@ #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" #include "distributed/multi_join_order.h" +#include "distributed/multi_router_planner.h" #include "distributed/pg_dist_node.h" #include "distributed/reference_table_utils.h" #include "distributed/shardinterval_utils.h" @@ -40,6 +41,7 @@ #include "storage/fd.h" #include "utils/builtins.h" #include "utils/fmgroids.h" +#include "utils/lsyscache.h" #include "utils/rel.h" #include "utils/relcache.h" @@ -179,20 +181,9 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS) Datum get_shard_id_for_distribution_column(PG_FUNCTION_ARGS) { - Oid relationId = InvalidOid; - Datum distributionValue = 0; - - Var *distributionColumn = NULL; - char distributionMethod = 0; - Oid expectedElementType = InvalidOid; - Oid inputElementType = InvalidOid; - DistTableCacheEntry *cacheEntry = NULL; - int shardCount = 0; - ShardInterval **shardIntervalArray = NULL; - FmgrInfo *hashFunction = NULL; - FmgrInfo *compareFunction = NULL; - bool useBinarySearch = true; ShardInterval *shardInterval = NULL; + char distributionMethod = 0; + Oid relationId = InvalidOid; /* * To have optional parameter as NULL, we defined this UDF as not strict, therefore @@ -227,6 +218,13 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS) else if (distributionMethod == DISTRIBUTE_BY_HASH || distributionMethod == DISTRIBUTE_BY_RANGE) { + Var *distributionColumn = NULL; + Oid distributionDataType = InvalidOid; + Oid inputDataType = InvalidOid; + char *distributionValueString = NULL; + Datum inputDatum = 0; + Datum distributionValueDatum = 0; + /* if given table is not reference table, distributionValue cannot be NULL */ if (PG_ARGISNULL(1)) { @@ -235,36 +233,17 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS) "than reference tables."))); } - distributionValue = PG_GETARG_DATUM(1); + inputDatum = PG_GETARG_DATUM(1); + inputDataType = get_fn_expr_argtype(fcinfo->flinfo, 1); + distributionValueString = DatumToString(inputDatum, inputDataType); distributionColumn = PartitionKey(relationId); - expectedElementType = distributionColumn->vartype; - inputElementType = get_fn_expr_argtype(fcinfo->flinfo, 1); - if (expectedElementType != inputElementType) - { - ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("invalid distribution value type"), - errdetail("Type of the value does not match the type of the " - "distribution column. Expected type id: %d, given " - "type id: %d", expectedElementType, - inputElementType))); - } + distributionDataType = distributionColumn->vartype; - cacheEntry = DistributedTableCacheEntry(relationId); + distributionValueDatum = StringToDatum(distributionValueString, + distributionDataType); - if (distributionMethod == DISTRIBUTE_BY_HASH && - cacheEntry->hasUniformHashDistribution) - { - useBinarySearch = false; - } - - shardCount = cacheEntry->shardIntervalArrayLength; - shardIntervalArray = cacheEntry->sortedShardIntervalArray; - hashFunction = cacheEntry->hashFunction; - compareFunction = cacheEntry->shardIntervalCompareFunction; - shardInterval = FindShardInterval(distributionValue, shardIntervalArray, - shardCount, distributionMethod, compareFunction, - hashFunction, useBinarySearch); + shardInterval = FastShardPruning(relationId, distributionValueDatum); } else { @@ -940,3 +919,40 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple) return workerNode; } + + +/* + * StringToDatum transforms a string representation into a Datum. + */ +Datum +StringToDatum(char *inputString, Oid dataType) +{ + Oid typIoFunc = InvalidOid; + Oid typIoParam = InvalidOid; + int32 typeModifier = -1; + Datum datum = 0; + + getTypeInputInfo(dataType, &typIoFunc, &typIoParam); + getBaseTypeAndTypmod(dataType, &typeModifier); + + datum = OidInputFunctionCall(typIoFunc, inputString, typIoParam, typeModifier); + + return datum; +} + + +/* + * DatumToString returns the string representation of the given datum. + */ +char * +DatumToString(Datum datum, Oid dataType) +{ + char *outputString = NULL; + Oid typIoFunc = InvalidOid; + bool typIsVarlena = false; + + getTypeOutputInfo(dataType, &typIoFunc, &typIsVarlena); + outputString = OidOutputFunctionCall(typIoFunc, datum); + + return outputString; +} diff --git a/src/include/distributed/connection_cache.h b/src/include/distributed/connection_cache.h index 9e24c8570..96aebf045 100644 --- a/src/include/distributed/connection_cache.h +++ b/src/include/distributed/connection_cache.h @@ -46,4 +46,6 @@ extern void WarnRemoteError(PGconn *connection, PGresult *result); extern void ReraiseRemoteError(PGconn *connection, PGresult *result); extern PGconn * ConnectToNode(char *nodeName, int nodePort, char *nodeUser); extern char * ConnectionGetOptionValue(PGconn *connection, char *optionKeyword); + + #endif /* CONNECTION_CACHE_H */ diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 8198e677c..74a9f10b5 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -98,5 +98,9 @@ extern void EnsureTablePermissions(Oid relationId, AclMode mode); extern void EnsureTableOwner(Oid relationId); extern void EnsureSuperUser(void); extern bool TableReferenced(Oid relationId); +extern char * ConstructQualifiedShardName(ShardInterval *shardInterval); +extern Datum StringToDatum(char *inputString, Oid dataType); +extern char * DatumToString(Datum datum, Oid dataType); + #endif /* MASTER_METADATA_UTILITY_H */ diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 72a2baebf..edd0ea653 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -20,6 +20,7 @@ #include "distributed/shardinterval_utils.h" #include "nodes/pg_list.h" +#include "distributed/master_metadata_utility.h" /* @@ -118,6 +119,7 @@ extern Oid ForeignConstraintGetReferencedTableId(char *queryString); extern void CheckHashPartitionedTable(Oid distributedTableId); extern void CheckTableSchemaNameForDrop(Oid relationId, char **schemaName, char **tableName); +extern text * IntegerToText(int32 value); /* Function declarations for generating metadata for shard and placement creation */ extern Datum master_get_table_metadata(PG_FUNCTION_ARGS); @@ -139,6 +141,7 @@ extern Datum master_drop_all_shards(PG_FUNCTION_ARGS); /* function declarations for shard creation functionality */ extern Datum master_create_worker_shards(PG_FUNCTION_ARGS); +extern Datum isolate_tenant_to_new_shard(PG_FUNCTION_ARGS); /* function declarations for shard repair functionality */ extern Datum master_copy_shard_placement(PG_FUNCTION_ARGS); diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index c9fef4e1c..f14a9dcd3 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -28,6 +28,7 @@ extern char * DistributionDeleteCommand(char *schemaName, extern char * TableOwnerResetCommand(Oid distributedRelationId); extern char * NodeListInsertCommand(List *workerNodeList); extern List * ShardListInsertCommand(List *shardIntervalList); +extern List * ShardDeleteCommandList(ShardInterval *shardInterval); extern char * NodeDeleteCommand(uint32 nodeId); extern char * ColocationIdUpdateCommand(Oid relationId, uint32 colocationId); extern char * CreateSchemaDDLCommand(Oid schemaId); diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index a215a48df..9fac89427 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -41,6 +41,7 @@ extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query); extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query); extern void AddShardIntervalRestrictionToSelect(Query *subqery, ShardInterval *shardInterval); +extern ShardInterval * FastShardPruning(Oid distributedTableId, Datum partitionValue); #endif /* MULTI_ROUTER_PLANNER_H */ diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index 6a95e99ca..a8f074be3 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -155,5 +155,8 @@ extern Datum worker_append_table_to_shard(PG_FUNCTION_ARGS); extern Datum worker_foreign_file_path(PG_FUNCTION_ARGS); extern Datum worker_find_block_local_path(PG_FUNCTION_ARGS); +/* Function declaration for calculating hashed value */ +extern Datum worker_hash(PG_FUNCTION_ARGS); + #endif /* WORKER_PROTOCOL_H */ diff --git a/src/test/regress/expected/multi_distribution_metadata.out b/src/test/regress/expected/multi_distribution_metadata.out index aebb04d65..e4e329b40 100644 --- a/src/test/regress/expected/multi_distribution_metadata.out +++ b/src/test/regress/expected/multi_distribution_metadata.out @@ -366,13 +366,13 @@ SELECT create_distributed_table('get_shardid_test_table2', 'column1'); (1 row) \COPY get_shardid_test_table2 FROM STDIN with delimiter '|'; -SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{a, b, c}'::text[]); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{a, b, c}'); get_shard_id_for_distribution_column -------------------------------------- 540013 (1 row) -SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{d, e, f}'::text[]); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{d, e, f}'); get_shard_id_for_distribution_column -------------------------------------- 540011 @@ -394,9 +394,9 @@ SELECT * FROM get_shardid_test_table2_540011; \c - - - :master_port -- test mismatching data type -SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a'::text); -ERROR: invalid distribution value type -DETAIL: Type of the value does not match the type of the distribution column. Expected type id: 1009, given type id: 25 +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a'); +ERROR: malformed array literal: "a" +DETAIL: Array value must start with "{" or dimension information. -- test NULL distribution column value for hash distributed table SELECT get_shard_id_for_distribution_column('get_shardid_test_table2'); ERROR: distribution value cannot be NULL for tables other than reference tables. @@ -443,13 +443,13 @@ SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 1); 540014 (1 row) -SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 'a'::text); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 'a'); get_shard_id_for_distribution_column -------------------------------------- 540014 (1 row) -SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', '{a, b, c}'::text[]); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', '{a, b, c}'); get_shard_id_for_distribution_column -------------------------------------- 540014 diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 38b310c5d..294f70af7 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -74,6 +74,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-13'; ALTER EXTENSION citus UPDATE TO '6.1-14'; ALTER EXTENSION citus UPDATE TO '6.1-15'; ALTER EXTENSION citus UPDATE TO '6.1-16'; +ALTER EXTENSION citus UPDATE TO '6.1-17'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) FROM pg_depend AS pgd, diff --git a/src/test/regress/expected/multi_utilities.out b/src/test/regress/expected/multi_utilities.out index 09e1e5491..737017745 100644 --- a/src/test/regress/expected/multi_utilities.out +++ b/src/test/regress/expected/multi_utilities.out @@ -319,6 +319,29 @@ VACUUM dustbunnies; WARNING: not propagating VACUUM command to worker nodes HINT: Set citus.enable_ddl_propagation to true in order to send targeted VACUUM commands to worker nodes. SET citus.enable_ddl_propagation to DEFAULT; +-- test worker_hash +SELECT worker_hash(123); + worker_hash +------------- + -205084363 +(1 row) + +SELECT worker_hash('1997-08-08'::date); + worker_hash +------------- + -499701663 +(1 row) + +-- test a custom type (this test should run after multi_data_types) +SELECT worker_hash('(1, 2)'); +ERROR: cannot find a hash function for the input type +HINT: Cast input to a data type with a hash function. +SELECT worker_hash('(1, 2)'::test_composite_type); + worker_hash +------------- + -1895345704 +(1 row) + -- TODO: support VERBOSE -- VACUUM VERBOSE dustbunnies; -- VACUUM (FULL, VERBOSE) dustbunnies; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 8e2982a92..6651ab562 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -135,9 +135,9 @@ test: multi_repair_shards test: multi_modifications test: multi_upsert test: multi_simple_queries -test: multi_utilities test: multi_create_insert_proxy test: multi_data_types +test: multi_utilities test: multi_repartition_udt test: multi_repartitioned_subquery_udf test: multi_modifying_xacts diff --git a/src/test/regress/multi_task_tracker_extra_schedule b/src/test/regress/multi_task_tracker_extra_schedule index 30c0a3653..c647e4eb2 100644 --- a/src/test/regress/multi_task_tracker_extra_schedule +++ b/src/test/regress/multi_task_tracker_extra_schedule @@ -90,9 +90,9 @@ test: multi_repair_shards test: multi_modifications test: multi_upsert test: multi_simple_queries -test: multi_utilities test: multi_create_insert_proxy test: multi_data_types +test: multi_utilities # --------- # multi_copy creates hash and range-partitioned tables and performs COPY diff --git a/src/test/regress/sql/multi_distribution_metadata.sql b/src/test/regress/sql/multi_distribution_metadata.sql index 90b642bb7..02c2be792 100644 --- a/src/test/regress/sql/multi_distribution_metadata.sql +++ b/src/test/regress/sql/multi_distribution_metadata.sql @@ -234,8 +234,8 @@ SELECT create_distributed_table('get_shardid_test_table2', 'column1'); {a, b, c}|1 {d, e, f}|2 \. -SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{a, b, c}'::text[]); -SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{d, e, f}'::text[]); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{a, b, c}'); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{d, e, f}'); -- verify result of the get_shard_id_for_distribution_column \c - - - :worker_1_port @@ -244,7 +244,7 @@ SELECT * FROM get_shardid_test_table2_540011; \c - - - :master_port -- test mismatching data type -SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a'::text); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a'); -- test NULL distribution column value for hash distributed table SELECT get_shard_id_for_distribution_column('get_shardid_test_table2'); @@ -268,8 +268,8 @@ SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', NULL); -- test different data types for reference table SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 1); -SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 'a'::text); -SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', '{a, b, c}'::text[]); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', 'a'); +SELECT get_shard_id_for_distribution_column('get_shardid_test_table4', '{a, b, c}'); -- test range distributed table CREATE TABLE get_shardid_test_table5(column1 int, column2 int); diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 38a7560e1..647cedde4 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -74,6 +74,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-13'; ALTER EXTENSION citus UPDATE TO '6.1-14'; ALTER EXTENSION citus UPDATE TO '6.1-15'; ALTER EXTENSION citus UPDATE TO '6.1-16'; +ALTER EXTENSION citus UPDATE TO '6.1-17'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) diff --git a/src/test/regress/sql/multi_utilities.sql b/src/test/regress/sql/multi_utilities.sql index e0a078cc9..e85d7ee85 100644 --- a/src/test/regress/sql/multi_utilities.sql +++ b/src/test/regress/sql/multi_utilities.sql @@ -209,6 +209,14 @@ SET citus.enable_ddl_propagation to false; VACUUM dustbunnies; SET citus.enable_ddl_propagation to DEFAULT; +-- test worker_hash +SELECT worker_hash(123); +SELECT worker_hash('1997-08-08'::date); + +-- test a custom type (this test should run after multi_data_types) +SELECT worker_hash('(1, 2)'); +SELECT worker_hash('(1, 2)'::test_composite_type); + -- TODO: support VERBOSE -- VACUUM VERBOSE dustbunnies; -- VACUUM (FULL, VERBOSE) dustbunnies;