Change native hash function with worker_hash

pull/1324/head
velioglu 2017-04-13 17:13:31 +03:00
parent eef4ed31cb
commit 2327b63291
5 changed files with 235 additions and 156 deletions

View File

@ -651,7 +651,7 @@ AddShardIntervalRestrictionToSelect(Query *subqery, ShardInterval *shardInterval
/* generate hashfunc(partCol) expression */ /* generate hashfunc(partCol) expression */
hashFunctionExpr = makeNode(FuncExpr); hashFunctionExpr = makeNode(FuncExpr);
hashFunctionExpr->funcid = typeEntry->hash_proc_finfo.fn_oid; hashFunctionExpr->funcid = CitusWorkerHashFunctionId();
hashFunctionExpr->args = list_make1(targetPartitionColumnVar); hashFunctionExpr->args = list_make1(targetPartitionColumnVar);
/* hash functions always return INT4 */ /* hash functions always return INT4 */

View File

@ -26,8 +26,10 @@
#include "commands/extension.h" #include "commands/extension.h"
#include "commands/trigger.h" #include "commands/trigger.h"
#include "distributed/colocation_utils.h" #include "distributed/colocation_utils.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_logical_optimizer.h"
#include "distributed/pg_dist_local_group.h" #include "distributed/pg_dist_local_group.h"
#include "distributed/pg_dist_node.h" #include "distributed/pg_dist_node.h"
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
@ -99,6 +101,7 @@ static Oid distShardPlacementNodeidIndexId = InvalidOid;
static Oid distTransactionRelationId = InvalidOid; static Oid distTransactionRelationId = InvalidOid;
static Oid distTransactionGroupIndexId = InvalidOid; static Oid distTransactionGroupIndexId = InvalidOid;
static Oid extraDataContainerFuncId = InvalidOid; static Oid extraDataContainerFuncId = InvalidOid;
static Oid workerHashFunctionId = InvalidOid;
/* Citus extension version variables */ /* Citus extension version variables */
bool EnableVersionChecks = true; /* version checks are enabled */ bool EnableVersionChecks = true; /* version checks are enabled */
@ -1452,6 +1455,24 @@ CitusExtraDataContainerFuncId(void)
} }
/* return oid of the worker_hash function */
Oid
CitusWorkerHashFunctionId(void)
{
if (workerHashFunctionId == InvalidOid)
{
Oid citusExtensionOid = get_extension_oid("citus", false);
Oid citusSchemaOid = get_extension_schema(citusExtensionOid);
char *citusSchemaName = get_namespace_name(citusSchemaOid);
const int argCount = 1;
workerHashFunctionId = FunctionOid(citusSchemaName, "worker_hash", argCount);
}
return workerHashFunctionId;
}
/* /*
* CitusExtensionOwner() returns the owner of the 'citus' extension. That user * CitusExtensionOwner() returns the owner of the 'citus' extension. That user
* is, amongst others, used to perform actions a normal user might not be * is, amongst others, used to perform actions a normal user might not be
@ -2193,6 +2214,7 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
distTransactionRelationId = InvalidOid; distTransactionRelationId = InvalidOid;
distTransactionGroupIndexId = InvalidOid; distTransactionGroupIndexId = InvalidOid;
extraDataContainerFuncId = InvalidOid; extraDataContainerFuncId = InvalidOid;
workerHashFunctionId = InvalidOid;
} }
} }

View File

@ -99,6 +99,7 @@ extern Oid DistShardPlacementNodeidIndexId(void);
/* function oids */ /* function oids */
extern Oid CitusExtraDataContainerFuncId(void); extern Oid CitusExtraDataContainerFuncId(void);
extern Oid CitusWorkerHashFunctionId(void);
/* user related functions */ /* user related functions */
extern Oid CitusExtensionOwner(void); extern Oid CitusExtensionOwner(void);

File diff suppressed because it is too large Load Diff

View File

@ -1573,6 +1573,29 @@ INSERT INTO text_table (part_col) SELECT val FROM text_table;
INSERT INTO text_table (part_col) SELECT val::text FROM text_table; INSERT INTO text_table (part_col) SELECT val::text FROM text_table;
insert into table_with_starts_with_defaults (b,c) select b,c FROM table_with_starts_with_defaults; insert into table_with_starts_with_defaults (b,c) select b,c FROM table_with_starts_with_defaults;
-- Test on partition column without native hash function
CREATE TABLE raw_table
(
id BIGINT,
time DATE
);
CREATE TABLE summary_table
(
time DATE,
count BIGINT
);
SELECT create_distributed_table('raw_table', 'time');
SELECT create_distributed_table('summary_table', 'time');
INSERT INTO raw_table VALUES(1, '11-11-1980');
INSERT INTO summary_table SELECT time, COUNT(*) FROM raw_table GROUP BY time;
SELECT * FROM summary_table;
DROP TABLE raw_table;
DROP TABLE summary_table;
DROP TABLE raw_events_first CASCADE; DROP TABLE raw_events_first CASCADE;
DROP TABLE raw_events_second; DROP TABLE raw_events_second;
DROP TABLE reference_table; DROP TABLE reference_table;