Size UDFs implemented

citus_table_size, citus_relation_size and citus_total_relation_size UDFs are implemented.
pull/1267/head
velioglu 2017-03-14 14:30:31 +03:00
parent d9c08c10f4
commit e32aff1a26
10 changed files with 474 additions and 2 deletions

View File

@ -10,7 +10,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.2-1 5.2-2 5.2-3 5.2-4 \
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \
6.2-1
6.2-1 6.2-2
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -132,6 +132,8 @@ $(EXTENSION)--6.1-17.sql: $(EXTENSION)--6.1-16.sql $(EXTENSION)--6.1-16--6.1-17.
cat $^ > $@
$(EXTENSION)--6.2-1.sql: $(EXTENSION)--6.1-17.sql $(EXTENSION)--6.1-17--6.2-1.sql
cat $^ > $@
$(EXTENSION)--6.2-2.sql: $(EXTENSION)--6.2-1.sql $(EXTENSION)--6.2-1--6.2-2.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,26 @@
/* citus--6.2-1--6.2-2.sql */
SET search_path = 'pg_catalog';
CREATE FUNCTION citus_table_size(logicalrelid regclass)
RETURNS bigint
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_table_size$$;
COMMENT ON FUNCTION citus_table_size(logicalrelid regclass)
IS 'get disk space used by the specified table, excluding indexes';
CREATE FUNCTION citus_relation_size(logicalrelid regclass)
RETURNS bigint
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_relation_size$$;
COMMENT ON FUNCTION citus_relation_size(logicalrelid regclass)
IS 'get disk space used by the ''main'' fork';
CREATE FUNCTION citus_total_relation_size(logicalrelid regclass)
RETURNS bigint
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_total_relation_size$$;
COMMENT ON FUNCTION citus_total_relation_size(logicalrelid regclass)
IS 'get total disk space used by the specified table';
RESET search_path;

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '6.2-1'
default_version = '6.2-2'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -12,6 +12,7 @@
#include "postgres.h"
#include "funcapi.h"
#include "libpq-fe.h"
#include "miscadmin.h"
#include "access/htup_details.h"
@ -36,7 +37,10 @@
#include "distributed/pg_dist_shard.h"
#include "distributed/pg_dist_shard_placement.h"
#include "distributed/relay_utility.h"
#include "distributed/resource_lock.h"
#include "distributed/remote_commands.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"
#include "nodes/makefuncs.h"
#include "parser/scansup.h"
#include "storage/lmgr.h"
@ -57,6 +61,278 @@ static void RecordDistributedRelationDependencies(Oid distributedRelationId,
Node *distributionKey);
static ShardPlacement * TupleToShardPlacement(TupleDesc tupleDesc,
HeapTuple heapTuple);
static uint64 DistributedTableSize(Oid relationId, char *sizeQuery);
static uint64 DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
char *sizeQuery);
static List * ShardIntervalsOnWorkerNode(WorkerNode *workerNode, Oid relationId);
static StringInfo GenerateSizeQueryOnMultiplePlacements(Oid distributedRelationId,
List *shardIntervalList,
char *sizeQuery);
static void ErrorIfNotSuitableToGetSize(Oid relationId);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(citus_table_size);
PG_FUNCTION_INFO_V1(citus_total_relation_size);
PG_FUNCTION_INFO_V1(citus_relation_size);
/*
* citus_total_relation_size accepts a table name and returns a distributed table
* and its indexes' total relation size.
*/
Datum
citus_total_relation_size(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
uint64 totalRelationSize = 0;
totalRelationSize = DistributedTableSize(relationId,
PG_TOTAL_RELATION_SIZE_FUNCTION);
PG_RETURN_INT64(totalRelationSize);
}
/*
* citus_table_size accepts a table name and returns a distributed table's total
* relation size.
*/
Datum
citus_table_size(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
uint64 tableSize = 0;
tableSize = DistributedTableSize(relationId, PG_TABLE_SIZE_FUNCTION);
PG_RETURN_INT64(tableSize);
}
/*
* citus_relation_size accept a table name and returns a relation's 'main'
* fork's size.
*/
Datum
citus_relation_size(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
uint64 relationSize = 0;
relationSize = DistributedTableSize(relationId, PG_RELATION_SIZE_FUNCTION);
PG_RETURN_INT64(relationSize);
}
/*
* DistributedTableSize is helper function for each kind of citus size functions.
* It first checks whether the table is distributed and size query can be run on
* it. Connection to each node has to be established to get the size of the table.
*/
static uint64
DistributedTableSize(Oid relationId, char *sizeQuery)
{
Relation pgDistNode = NULL;
List *workerNodeList = NULL;
ListCell *workerNodeCell = NULL;
uint64 totalRelationSize = 0;
if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD)
{
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("citus size functions cannot be called in transaction"
" blocks which contain multi-shard data modifications")));
}
ErrorIfNotSuitableToGetSize(relationId);
pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
workerNodeList = WorkerNodeList();
foreach(workerNodeCell, workerNodeList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
uint64 relationSizeOnNode = DistributedTableSizeOnWorker(workerNode, relationId,
sizeQuery);
totalRelationSize += relationSizeOnNode;
}
heap_close(pgDistNode, NoLock);
return totalRelationSize;
}
/*
* DistributedTableSizeOnWorker gets the workerNode and relationId to calculate
* size of that relation on the given workerNode by summing up the size of each
* shard placement.
*/
static uint64
DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, char *sizeQuery)
{
StringInfo tableSizeQuery = NULL;
StringInfo tableSizeStringInfo = NULL;
char *workerNodeName = workerNode->workerName;
uint32 workerNodePort = workerNode->workerPort;
char *tableSizeString;
List *sizeList = NIL;
uint64 tableSize = 0;
List *shardIntervalsOnNode = ShardIntervalsOnWorkerNode(workerNode, relationId);
tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(relationId,
shardIntervalsOnNode,
sizeQuery);
sizeList = ExecuteRemoteQuery(workerNodeName, workerNodePort, NULL, tableSizeQuery);
if (sizeList == NIL)
{
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("cannot get the size because of a connection error")));
}
tableSizeStringInfo = (StringInfo) linitial(sizeList);
tableSizeString = tableSizeStringInfo->data;
tableSize = atol(tableSizeString);
return tableSize;
}
/*
* ShardIntervalsOnNode takes a WorkerNode then compares it with each placement
* of table. It returns shard intervals of table on that node as a list of shard
* intervals. Note that, shard intervals returned as elements of the list are
* not the copies but the pointers.
*
* DO NOT modify the shard intervals returned by this function.
*/
static List *
ShardIntervalsOnWorkerNode(WorkerNode *workerNode, Oid relationId)
{
DistTableCacheEntry *distTableCacheEntry = DistributedTableCacheEntry(relationId);
char *workerNodeName = workerNode->workerName;
uint32 workerNodePort = workerNode->workerPort;
List *shardIntervalList = NIL;
int shardIndex = 0;
int shardIntervalArrayLength = distTableCacheEntry->shardIntervalArrayLength;
for (shardIndex = 0; shardIndex < shardIntervalArrayLength; shardIndex++)
{
ShardPlacement *placementArray =
distTableCacheEntry->arrayOfPlacementArrays[shardIndex];
int numberOfPlacements =
distTableCacheEntry->arrayOfPlacementArrayLengths[shardIndex];
int placementIndex = 0;
for (placementIndex = 0; placementIndex < numberOfPlacements; placementIndex++)
{
ShardPlacement *placement = &placementArray[placementIndex];
char *shardNodeName = placement->nodeName;
uint32 shardNodePort = placement->nodePort;
uint64 shardId = placement->shardId;
bool metadataLock = false;
metadataLock = TryLockShardDistributionMetadata(shardId, ShareLock);
/* if the lock is not acquired warn the user */
if (metadataLock == false)
{
ereport(WARNING, (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("lock is not acquired, size of shard %ld "
"will be ignored", shardId)));
continue;
}
if (strcmp(shardNodeName, workerNodeName) == 0 &&
shardNodePort == workerNodePort)
{
ShardInterval *shardInterval =
distTableCacheEntry->sortedShardIntervalArray[shardIndex];
shardIntervalList = lappend(shardIntervalList, shardInterval);
}
}
}
return shardIntervalList;
}
/*
* GenerateSizeQueryOnMultiplePlacements generates a select size query to get
* size of multiple tables from the relation with distributedRelationId. Note
* that, different size functions supported by PG are also supported by this
* function changing the size query given as the last parameter to function.
* Format of sizeQuery is pg_*_size(%s). Examples of it can be found in the
* master_protocol.h
*/
static StringInfo
GenerateSizeQueryOnMultiplePlacements(Oid distributedRelationId, List *shardIntervalList,
char *sizeQuery)
{
Oid schemaId = get_rel_namespace(distributedRelationId);
char *schemaName = get_namespace_name(schemaId);
StringInfo selectQuery = makeStringInfo();
ListCell *shardIntervalCell = NULL;
appendStringInfo(selectQuery, "SELECT ");
foreach(shardIntervalCell, shardIntervalList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId;
char *shardName = get_rel_name(distributedRelationId);
char *shardQualifiedName = NULL;
char *quotedShardName = NULL;
AppendShardIdToName(&shardName, shardId);
shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
quotedShardName = quote_literal_cstr(shardQualifiedName);
appendStringInfo(selectQuery, sizeQuery, quotedShardName);
appendStringInfo(selectQuery, " + ");
}
/*
* Add 0 as a last size, it handles empty list case and makes size control checks
* unnecessary which would have implemented without this line.
*/
appendStringInfo(selectQuery, "0;");
return selectQuery;
}
/*
* ErrorIfNotSuitableToGetSize determines whether the table is suitable to find
* its' size with internal functions.
*/
static void
ErrorIfNotSuitableToGetSize(Oid relationId)
{
if (!IsDistributedTable(relationId))
{
char *relationName = get_rel_name(relationId);
char *escapedQueryString = quote_literal_cstr(relationName);
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("cannot calculate the size because relation %s is not "
"distributed", escapedQueryString)));
}
if (PartitionMethod(relationId) == DISTRIBUTE_BY_HASH &&
!SingleReplicatedTable(relationId))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot calculate the size because replication factor "
"is greater than 1")));
}
}
/*

View File

@ -25,6 +25,9 @@
/* total number of hash tokens (2^32) */
#define HASH_TOKEN_COUNT INT64CONST(4294967296)
#define SELECT_EXIST_QUERY "SELECT EXISTS (SELECT 1 FROM %s)"
#define PG_TABLE_SIZE_FUNCTION "pg_table_size(%s)"
#define PG_RELATION_SIZE_FUNCTION "pg_relation_size(%s)"
#define PG_TOTAL_RELATION_SIZE_FUNCTION "pg_total_relation_size(%s)"
/* In-memory representation of a typed tuple in pg_dist_shard. */
typedef struct ShardInterval

View File

@ -76,6 +76,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-15';
ALTER EXTENSION citus UPDATE TO '6.1-16';
ALTER EXTENSION citus UPDATE TO '6.1-17';
ALTER EXTENSION citus UPDATE TO '6.2-1';
ALTER EXTENSION citus UPDATE TO '6.2-2';
-- ensure no objects were created outside pg_catalog
SELECT COUNT(*)
FROM pg_depend AS pgd,

View File

@ -0,0 +1,106 @@
--
-- MULTI_SIZE_QUERIES
--
-- Test checks whether size of distributed tables can be obtained with citus_table_size.
-- To find the relation size and total relation size citus_relation_size and
-- citus_total_relation_size are also tested.
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1390000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1390000;
-- Tests on distributed table with replication factor > 1
SELECT citus_table_size('lineitem_hash_part');
ERROR: cannot calculate the size because replication factor is greater than 1
SELECT citus_relation_size('lineitem_hash_part');
ERROR: cannot calculate the size because replication factor is greater than 1
SELECT citus_total_relation_size('lineitem_hash_part');
ERROR: cannot calculate the size because replication factor is greater than 1
VACUUM (FULL) customer_copy_hash;
-- Tests on distributed tables with streaming replication.
SELECT citus_table_size('customer_copy_hash');
citus_table_size
------------------
548864
(1 row)
SELECT citus_relation_size('customer_copy_hash');
citus_relation_size
---------------------
548864
(1 row)
SELECT citus_total_relation_size('customer_copy_hash');
citus_total_relation_size
---------------------------
1597440
(1 row)
CREATE INDEX index_1 on customer_copy_hash(c_custkey);
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
VACUUM (FULL) customer_copy_hash;
-- Tests on distributed table with index.
SELECT citus_table_size('customer_copy_hash');
citus_table_size
------------------
548864
(1 row)
SELECT citus_relation_size('customer_copy_hash');
citus_relation_size
---------------------
548864
(1 row)
SELECT citus_total_relation_size('customer_copy_hash');
citus_total_relation_size
---------------------------
2646016
(1 row)
-- Tests on reference table
VACUUM (FULL) supplier;
SELECT citus_table_size('supplier');
citus_table_size
------------------
376832
(1 row)
SELECT citus_relation_size('supplier');
citus_relation_size
---------------------
376832
(1 row)
SELECT citus_total_relation_size('supplier');
citus_total_relation_size
---------------------------
376832
(1 row)
CREATE INDEX index_2 on supplier(s_suppkey);
VACUUM (FULL) supplier;
SELECT citus_table_size('supplier');
citus_table_size
------------------
376832
(1 row)
SELECT citus_relation_size('supplier');
citus_relation_size
---------------------
376832
(1 row)
SELECT citus_total_relation_size('supplier');
citus_total_relation_size
---------------------------
458752
(1 row)
-- Test inside the transaction
BEGIN;
ALTER TABLE supplier ALTER COLUMN s_suppkey SET NOT NULL;
select citus_table_size('supplier');
ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications
END;
DROP INDEX index_1;
DROP INDEX index_2;

View File

@ -158,6 +158,11 @@ test: multi_router_planner
# ----------
test: multi_large_shardid
# ----------
# multi_size_queries tests various size commands on distributed tables
# ----------
test: multi_size_queries
# ----------
# multi_drop_extension makes sure we can safely drop and recreate the extension
# ----------

View File

@ -76,6 +76,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-15';
ALTER EXTENSION citus UPDATE TO '6.1-16';
ALTER EXTENSION citus UPDATE TO '6.1-17';
ALTER EXTENSION citus UPDATE TO '6.2-1';
ALTER EXTENSION citus UPDATE TO '6.2-2';
-- ensure no objects were created outside pg_catalog
SELECT COUNT(*)

View File

@ -0,0 +1,52 @@
--
-- MULTI_SIZE_QUERIES
--
-- Test checks whether size of distributed tables can be obtained with citus_table_size.
-- To find the relation size and total relation size citus_relation_size and
-- citus_total_relation_size are also tested.
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1390000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1390000;
-- Tests on distributed table with replication factor > 1
SELECT citus_table_size('lineitem_hash_part');
SELECT citus_relation_size('lineitem_hash_part');
SELECT citus_total_relation_size('lineitem_hash_part');
VACUUM (FULL) customer_copy_hash;
-- Tests on distributed tables with streaming replication.
SELECT citus_table_size('customer_copy_hash');
SELECT citus_relation_size('customer_copy_hash');
SELECT citus_total_relation_size('customer_copy_hash');
CREATE INDEX index_1 on customer_copy_hash(c_custkey);
VACUUM (FULL) customer_copy_hash;
-- Tests on distributed table with index.
SELECT citus_table_size('customer_copy_hash');
SELECT citus_relation_size('customer_copy_hash');
SELECT citus_total_relation_size('customer_copy_hash');
-- Tests on reference table
VACUUM (FULL) supplier;
SELECT citus_table_size('supplier');
SELECT citus_relation_size('supplier');
SELECT citus_total_relation_size('supplier');
CREATE INDEX index_2 on supplier(s_suppkey);
VACUUM (FULL) supplier;
SELECT citus_table_size('supplier');
SELECT citus_relation_size('supplier');
SELECT citus_total_relation_size('supplier');
-- Test inside the transaction
BEGIN;
ALTER TABLE supplier ALTER COLUMN s_suppkey SET NOT NULL;
select citus_table_size('supplier');
END;
DROP INDEX index_1;
DROP INDEX index_2;