Fix COUNT DISTINCT approximation with schema

Fixes #555

Before this change, we were resolving HLL function and type Oid without qualified name.
Now we find the schema name where HLL objects are stored and generate qualified names for
each objects.

Similar fix is also applied for cstore_table_size function call.
pull/655/head
Burak Yucesoy 2016-07-19 15:21:20 +03:00
parent fdd6c57bdf
commit 20debfc0ee
7 changed files with 144 additions and 15 deletions

View File

@ -26,6 +26,7 @@
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "commands/extension.h" #include "commands/extension.h"
#include "distributed/citus_nodes.h" #include "distributed/citus_nodes.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_planner.h"
@ -125,6 +126,7 @@ static List * WorkerAggregateExpressionList(Aggref *originalAggregate,
static AggregateType GetAggregateType(Oid aggFunctionId); static AggregateType GetAggregateType(Oid aggFunctionId);
static Oid AggregateArgumentType(Aggref *aggregate); static Oid AggregateArgumentType(Aggref *aggregate);
static Oid AggregateFunctionOid(const char *functionName, Oid inputType); static Oid AggregateFunctionOid(const char *functionName, Oid inputType);
static Oid TypeOid(Oid schemaId, const char *typeName);
/* Local functions forward declarations for count(distinct) approximations */ /* Local functions forward declarations for count(distinct) approximations */
static char * CountDistinctHashFunctionName(Oid argumentType); static char * CountDistinctHashFunctionName(Oid argumentType);
@ -1419,11 +1421,18 @@ MasterAggregateExpression(Aggref *originalAggregate,
Aggref *unionAggregate = NULL; Aggref *unionAggregate = NULL;
FuncExpr *cardinalityExpression = NULL; FuncExpr *cardinalityExpression = NULL;
Oid unionFunctionId = FunctionOid(HLL_UNION_AGGREGATE_NAME, argCount); /* extract schema name of hll */
Oid cardinalityFunctionId = FunctionOid(HLL_CARDINALITY_FUNC_NAME, argCount); Oid hllId = get_extension_oid(HLL_EXTENSION_NAME, false);
Oid hllSchemaOid = get_extension_schema(hllId);
const char *hllSchemaName = get_namespace_name(hllSchemaOid);
Oid unionFunctionId = FunctionOid(hllSchemaName, HLL_UNION_AGGREGATE_NAME,
argCount);
Oid cardinalityFunctionId = FunctionOid(hllSchemaName, HLL_CARDINALITY_FUNC_NAME,
argCount);
Oid cardinalityReturnType = get_func_rettype(cardinalityFunctionId); Oid cardinalityReturnType = get_func_rettype(cardinalityFunctionId);
Oid hllType = TypenameGetTypid(HLL_TYPE_NAME); Oid hllType = TypeOid(hllSchemaOid, HLL_TYPE_NAME);
Oid hllTypeCollationId = get_typcollation(hllType); Oid hllTypeCollationId = get_typcollation(hllType);
Var *hllColumn = makeVar(masterTableId, walkerContext->columnId, hllType, Var *hllColumn = makeVar(masterTableId, walkerContext->columnId, hllType,
defaultTypeMod, defaultTypeMod,
@ -1911,13 +1920,20 @@ WorkerAggregateExpressionList(Aggref *originalAggregate,
TargetEntry *argument = (TargetEntry *) linitial(originalAggregate->args); TargetEntry *argument = (TargetEntry *) linitial(originalAggregate->args);
Expr *argumentExpression = copyObject(argument->expr); Expr *argumentExpression = copyObject(argument->expr);
/* extract schema name of hll */
Oid hllId = get_extension_oid(HLL_EXTENSION_NAME, false);
Oid hllSchemaOid = get_extension_schema(hllId);
const char *hllSchemaName = get_namespace_name(hllSchemaOid);
char *hashFunctionName = CountDistinctHashFunctionName(argumentType); char *hashFunctionName = CountDistinctHashFunctionName(argumentType);
Oid hashFunctionId = FunctionOid(hashFunctionName, hashArgumentCount); Oid hashFunctionId = FunctionOid(hllSchemaName, hashFunctionName,
hashArgumentCount);
Oid hashFunctionReturnType = get_func_rettype(hashFunctionId); Oid hashFunctionReturnType = get_func_rettype(hashFunctionId);
/* init hll_add_agg() related variables */ /* init hll_add_agg() related variables */
Oid addFunctionId = FunctionOid(HLL_ADD_AGGREGATE_NAME, addArgumentCount); Oid addFunctionId = FunctionOid(hllSchemaName, HLL_ADD_AGGREGATE_NAME,
Oid hllType = TypenameGetTypid(HLL_TYPE_NAME); addArgumentCount);
Oid hllType = TypeOid(hllSchemaOid, HLL_TYPE_NAME);
int logOfStorageSize = CountDistinctStorageSize(CountDistinctErrorRate); int logOfStorageSize = CountDistinctStorageSize(CountDistinctErrorRate);
Const *logOfStorageSizeConst = MakeIntegerConst(logOfStorageSize); Const *logOfStorageSizeConst = MakeIntegerConst(logOfStorageSize);
@ -2103,18 +2119,19 @@ AggregateFunctionOid(const char *functionName, Oid inputType)
* of arguments, and returns the corresponding function's oid. * of arguments, and returns the corresponding function's oid.
*/ */
Oid Oid
FunctionOid(const char *functionName, int argumentCount) FunctionOid(const char *schemaName, const char *functionName, int argumentCount)
{ {
FuncCandidateList functionList = NULL; FuncCandidateList functionList = NULL;
Oid functionOid = InvalidOid; Oid functionOid = InvalidOid;
List *qualifiedFunctionName = stringToQualifiedNameList(functionName); char *qualifiedFunctionName = quote_qualified_identifier(schemaName, functionName);
List *qualifiedFunctionNameList = stringToQualifiedNameList(qualifiedFunctionName);
List *argumentList = NIL; List *argumentList = NIL;
const bool findVariadics = false; const bool findVariadics = false;
const bool findDefaults = false; const bool findDefaults = false;
const bool missingOK = true; const bool missingOK = true;
functionList = FuncnameGetCandidates(qualifiedFunctionName, argumentCount, functionList = FuncnameGetCandidates(qualifiedFunctionNameList, argumentCount,
argumentList, findVariadics, argumentList, findVariadics,
findDefaults, missingOK); findDefaults, missingOK);
@ -2135,6 +2152,22 @@ FunctionOid(const char *functionName, int argumentCount)
} }
/*
* TypeOid looks for a type that has the given name and schema, and returns the
* corresponding type's oid.
*/
static Oid
TypeOid(Oid schemaId, const char *typeName)
{
Oid typeOid;
typeOid = GetSysCacheOid2(TYPENAMENSP, PointerGetDatum(typeName),
ObjectIdGetDatum(schemaId));
return typeOid;
}
/* /*
* CountDistinctHashFunctionName resolves the hll_hash function name to use for * CountDistinctHashFunctionName resolves the hll_hash function name to use for
* the given input type, and returns this function name. * the given input type, and returns this function name.
@ -4393,9 +4426,21 @@ static bool
HasOrderByHllType(List *sortClauseList, List *targetList) HasOrderByHllType(List *sortClauseList, List *targetList)
{ {
bool hasOrderByHllType = false; bool hasOrderByHllType = false;
Oid hllTypeId = TypenameGetTypid(HLL_TYPE_NAME); Oid hllId = InvalidOid;
Oid hllSchemaOid = InvalidOid;
Oid hllTypeId = InvalidOid;
ListCell *sortClauseCell = NULL; ListCell *sortClauseCell = NULL;
/* check whether HLL is loaded */
hllId = get_extension_oid(HLL_EXTENSION_NAME, true);
if (!OidIsValid(hllId))
{
return hasOrderByHllType;
}
hllSchemaOid = get_extension_schema(hllId);
hllTypeId = TypeOid(hllSchemaOid, HLL_TYPE_NAME);
foreach(sortClauseCell, sortClauseList) foreach(sortClauseCell, sortClauseList)
{ {
SortGroupClause *sortClause = (SortGroupClause *) lfirst(sortClauseCell); SortGroupClause *sortClause = (SortGroupClause *) lfirst(sortClauseCell);

View File

@ -53,7 +53,6 @@
#include "utils/typcache.h" #include "utils/typcache.h"
#include "utils/xml.h" #include "utils/xml.h"
static Oid get_extension_schema(Oid ext_oid);
static void AppendOptionListToString(StringInfo stringData, List *options); static void AppendOptionListToString(StringInfo stringData, List *options);
static const char * convert_aclright_to_string(int aclright); static const char * convert_aclright_to_string(int aclright);
@ -101,7 +100,7 @@ pg_get_extensiondef_string(Oid tableRelationId)
* *
* Returns InvalidOid if no such extension. * Returns InvalidOid if no such extension.
*/ */
static Oid Oid
get_extension_schema(Oid ext_oid) get_extension_schema(Oid ext_oid)
{ {
/* *INDENT-OFF* */ /* *INDENT-OFF* */

View File

@ -23,6 +23,8 @@
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "commands/copy.h" #include "commands/copy.h"
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "commands/extension.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/multi_client_executor.h" #include "distributed/multi_client_executor.h"
#include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_optimizer.h"
@ -592,8 +594,15 @@ LocalTableSize(Oid relationId)
bool cstoreTable = CStoreTable(relationId); bool cstoreTable = CStoreTable(relationId);
if (cstoreTable) if (cstoreTable)
{ {
/* extract schema name of cstore */
Oid cstoreId = get_extension_oid(CSTORE_FDW_NAME, false);
Oid cstoreSchemaOid = get_extension_schema(cstoreId);
const char *cstoreSchemaName = get_namespace_name(cstoreSchemaOid);
const int tableSizeArgumentCount = 1; const int tableSizeArgumentCount = 1;
Oid tableSizeFunctionOid = FunctionOid(CSTORE_TABLE_SIZE_FUNCTION_NAME,
Oid tableSizeFunctionOid = FunctionOid(cstoreSchemaName,
CSTORE_TABLE_SIZE_FUNCTION_NAME,
tableSizeArgumentCount); tableSizeArgumentCount);
Datum tableSizeDatum = OidFunctionCall1(tableSizeFunctionOid, Datum tableSizeDatum = OidFunctionCall1(tableSizeFunctionOid,
relationIdDatum); relationIdDatum);

View File

@ -22,6 +22,7 @@ extern char * pg_get_tableschemadef_string(Oid tableRelationId);
extern char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId); extern char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId);
extern char * pg_get_indexclusterdef_string(Oid indexRelationId); extern char * pg_get_indexclusterdef_string(Oid indexRelationId);
extern List * pg_get_table_grants(Oid relationId); extern List * pg_get_table_grants(Oid relationId);
extern Oid get_extension_schema(Oid ext_oid);
/* Function declarations for version dependent PostgreSQL ruleutils functions */ /* Function declarations for version dependent PostgreSQL ruleutils functions */
extern void pg_get_query_def(Query *query, StringInfo buffer); extern void pg_get_query_def(Query *query, StringInfo buffer);

View File

@ -112,7 +112,8 @@ extern void MultiLogicalPlanOptimize(MultiTreeRoot *multiTree);
extern char PartitionMethod(Oid relationId); extern char PartitionMethod(Oid relationId);
/* Function declaration for getting oid for the given function name */ /* Function declaration for getting oid for the given function name */
extern Oid FunctionOid(const char *functionName, int argumentCount); extern Oid FunctionOid(const char *schemaName, const char *functionName,
int argumentCount);
/* Function declaration for helper functions in subquery pushdown */ /* Function declaration for helper functions in subquery pushdown */
extern List * SubqueryMultiTableList(MultiNode *multiNode); extern List * SubqueryMultiTableList(MultiNode *multiNode);

View File

@ -1,6 +1,8 @@
-- --
-- MULTI_AGG_APPROXIMATE_DISTINCT -- MULTI_AGG_APPROXIMATE_DISTINCT
-- --
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 340000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 340000;
-- Try to execute count(distinct) when approximate distincts aren't enabled -- Try to execute count(distinct) when approximate distincts aren't enabled
SELECT count(distinct l_orderkey) FROM lineitem; SELECT count(distinct l_orderkey) FROM lineitem;
ERROR: cannot compute aggregate (distinct) ERROR: cannot compute aggregate (distinct)
@ -100,6 +102,47 @@ SELECT count(DISTINCT l_orderkey) as distinct_order_count, l_quantity FROM linei
223 | 31.00 223 | 31.00
(10 rows) (10 rows)
-- Check that approximate count(distinct) works at a table in a schema other than public
-- create necessary objects
CREATE SCHEMA test_count_distinct_schema;
NOTICE: Citus partially supports CREATE SCHEMA for distributed databases
DETAIL: schema usage in joins and in some UDFs provided by Citus are not supported yet
CREATE TABLE test_count_distinct_schema.nation_hash(
n_nationkey integer not null,
n_name char(25) not null,
n_regionkey integer not null,
n_comment varchar(152)
);
SELECT master_create_distributed_table('test_count_distinct_schema.nation_hash', 'n_nationkey', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('test_count_distinct_schema.nation_hash', 4, 2);
master_create_worker_shards
-----------------------------
(1 row)
\COPY test_count_distinct_schema.nation_hash FROM STDIN with delimiter '|';
SET search_path TO public;
SET citus.count_distinct_error_rate TO 0.01;
SELECT COUNT (DISTINCT n_regionkey) FROM test_count_distinct_schema.nation_hash;
count
-------
3
(1 row)
-- test with search_path is set
SET search_path TO test_count_distinct_schema;
SELECT COUNT (DISTINCT n_regionkey) FROM nation_hash;
count
-------
3
(1 row)
SET search_path TO public;
-- If we have an order by on count(distinct) that we intend to push down to -- If we have an order by on count(distinct) that we intend to push down to
-- worker nodes, we need to error out. Otherwise, we are fine. -- worker nodes, we need to error out. Otherwise, we are fine.
SET citus.limit_clause_row_fetch_count = 1000; SET citus.limit_clause_row_fetch_count = 1000;

View File

@ -52,6 +52,37 @@ SELECT count(DISTINCT l_orderkey) as distinct_order_count, l_quantity FROM linei
ORDER BY distinct_order_count ASC, l_quantity ASC ORDER BY distinct_order_count ASC, l_quantity ASC
LIMIT 10; LIMIT 10;
-- Check that approximate count(distinct) works at a table in a schema other than public
-- create necessary objects
CREATE SCHEMA test_count_distinct_schema;
CREATE TABLE test_count_distinct_schema.nation_hash(
n_nationkey integer not null,
n_name char(25) not null,
n_regionkey integer not null,
n_comment varchar(152)
);
SELECT master_create_distributed_table('test_count_distinct_schema.nation_hash', 'n_nationkey', 'hash');
SELECT master_create_worker_shards('test_count_distinct_schema.nation_hash', 4, 2);
\COPY test_count_distinct_schema.nation_hash FROM STDIN with delimiter '|';
0|ALGERIA|0|haggle. carefully final deposits detect slyly agai
1|ARGENTINA|1|al foxes promise slyly according to the regular accounts. bold requests alon
2|BRAZIL|1|y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special
3|CANADA|1|eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold
4|EGYPT|4|y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d
5|ETHIOPIA|0|ven packages wake quickly. regu
\.
SET search_path TO public;
SET citus.count_distinct_error_rate TO 0.01;
SELECT COUNT (DISTINCT n_regionkey) FROM test_count_distinct_schema.nation_hash;
-- test with search_path is set
SET search_path TO test_count_distinct_schema;
SELECT COUNT (DISTINCT n_regionkey) FROM nation_hash;
SET search_path TO public;
-- If we have an order by on count(distinct) that we intend to push down to -- If we have an order by on count(distinct) that we intend to push down to
-- worker nodes, we need to error out. Otherwise, we are fine. -- worker nodes, we need to error out. Otherwise, we are fine.