From e32aff1a26da096478ed69f4a860d6e1ef8723bd Mon Sep 17 00:00:00 2001 From: velioglu Date: Tue, 14 Mar 2017 14:30:31 +0300 Subject: [PATCH] Size UDFs implemented citus_table_size, citus_relation_size and citus_total_relation_size UDFs are implemented. --- src/backend/distributed/Makefile | 4 +- .../distributed/citus--6.2-1--6.2-2.sql | 26 ++ src/backend/distributed/citus.control | 2 +- .../master/master_metadata_utility.c | 276 ++++++++++++++++++ .../distributed/master_metadata_utility.h | 3 + src/test/regress/expected/multi_extension.out | 1 + .../regress/expected/multi_size_queries.out | 106 +++++++ src/test/regress/multi_schedule | 5 + src/test/regress/sql/multi_extension.sql | 1 + src/test/regress/sql/multi_size_queries.sql | 52 ++++ 10 files changed, 474 insertions(+), 2 deletions(-) create mode 100644 src/backend/distributed/citus--6.2-1--6.2-2.sql create mode 100644 src/test/regress/expected/multi_size_queries.out create mode 100644 src/test/regress/sql/multi_size_queries.sql diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index c0dd16d94..75c099e3b 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -10,7 +10,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \ - 6.2-1 + 6.2-1 6.2-2 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -132,6 +132,8 @@ $(EXTENSION)--6.1-17.sql: $(EXTENSION)--6.1-16.sql $(EXTENSION)--6.1-16--6.1-17. cat $^ > $@ $(EXTENSION)--6.2-1.sql: $(EXTENSION)--6.1-17.sql $(EXTENSION)--6.1-17--6.2-1.sql cat $^ > $@ +$(EXTENSION)--6.2-2.sql: $(EXTENSION)--6.2-1.sql $(EXTENSION)--6.2-1--6.2-2.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.2-1--6.2-2.sql b/src/backend/distributed/citus--6.2-1--6.2-2.sql new file mode 100644 index 000000000..55e4bfa18 --- /dev/null +++ b/src/backend/distributed/citus--6.2-1--6.2-2.sql @@ -0,0 +1,26 @@ +/* citus--6.2-1--6.2-2.sql */ + +SET search_path = 'pg_catalog'; + +CREATE FUNCTION citus_table_size(logicalrelid regclass) + RETURNS bigint + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$citus_table_size$$; +COMMENT ON FUNCTION citus_table_size(logicalrelid regclass) + IS 'get disk space used by the specified table, excluding indexes'; + +CREATE FUNCTION citus_relation_size(logicalrelid regclass) + RETURNS bigint + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$citus_relation_size$$; +COMMENT ON FUNCTION citus_relation_size(logicalrelid regclass) + IS 'get disk space used by the ''main'' fork'; + +CREATE FUNCTION citus_total_relation_size(logicalrelid regclass) + RETURNS bigint + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$citus_total_relation_size$$; +COMMENT ON FUNCTION citus_total_relation_size(logicalrelid regclass) + IS 'get total disk space used by the specified table'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index daccbdeb5..42fc2f856 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.2-1' +default_version = '6.2-2' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index 1ee0bf208..289b59f21 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -12,6 +12,7 @@ #include "postgres.h" #include "funcapi.h" +#include "libpq-fe.h" #include "miscadmin.h" #include "access/htup_details.h" @@ -36,7 +37,10 @@ #include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard_placement.h" #include "distributed/relay_utility.h" +#include "distributed/resource_lock.h" +#include "distributed/remote_commands.h" #include "distributed/worker_manager.h" +#include "distributed/worker_protocol.h" #include "nodes/makefuncs.h" #include "parser/scansup.h" #include "storage/lmgr.h" @@ -57,6 +61,278 @@ static void RecordDistributedRelationDependencies(Oid distributedRelationId, Node *distributionKey); static ShardPlacement * TupleToShardPlacement(TupleDesc tupleDesc, HeapTuple heapTuple); +static uint64 DistributedTableSize(Oid relationId, char *sizeQuery); +static uint64 DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, + char *sizeQuery); +static List * ShardIntervalsOnWorkerNode(WorkerNode *workerNode, Oid relationId); +static StringInfo GenerateSizeQueryOnMultiplePlacements(Oid distributedRelationId, + List *shardIntervalList, + char *sizeQuery); +static void ErrorIfNotSuitableToGetSize(Oid relationId); + + +/* exports for SQL callable functions */ +PG_FUNCTION_INFO_V1(citus_table_size); +PG_FUNCTION_INFO_V1(citus_total_relation_size); +PG_FUNCTION_INFO_V1(citus_relation_size); + + +/* + * citus_total_relation_size accepts a table name and returns a distributed table + * and its indexes' total relation size. + */ +Datum +citus_total_relation_size(PG_FUNCTION_ARGS) +{ + Oid relationId = PG_GETARG_OID(0); + uint64 totalRelationSize = 0; + + totalRelationSize = DistributedTableSize(relationId, + PG_TOTAL_RELATION_SIZE_FUNCTION); + + PG_RETURN_INT64(totalRelationSize); +} + + +/* + * citus_table_size accepts a table name and returns a distributed table's total + * relation size. + */ +Datum +citus_table_size(PG_FUNCTION_ARGS) +{ + Oid relationId = PG_GETARG_OID(0); + uint64 tableSize = 0; + + tableSize = DistributedTableSize(relationId, PG_TABLE_SIZE_FUNCTION); + + PG_RETURN_INT64(tableSize); +} + + +/* + * citus_relation_size accept a table name and returns a relation's 'main' + * fork's size. + */ +Datum +citus_relation_size(PG_FUNCTION_ARGS) +{ + Oid relationId = PG_GETARG_OID(0); + uint64 relationSize = 0; + + relationSize = DistributedTableSize(relationId, PG_RELATION_SIZE_FUNCTION); + + PG_RETURN_INT64(relationSize); +} + + +/* + * DistributedTableSize is helper function for each kind of citus size functions. + * It first checks whether the table is distributed and size query can be run on + * it. Connection to each node has to be established to get the size of the table. + */ +static uint64 +DistributedTableSize(Oid relationId, char *sizeQuery) +{ + Relation pgDistNode = NULL; + List *workerNodeList = NULL; + ListCell *workerNodeCell = NULL; + uint64 totalRelationSize = 0; + + if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD) + { + ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("citus size functions cannot be called in transaction" + " blocks which contain multi-shard data modifications"))); + } + + ErrorIfNotSuitableToGetSize(relationId); + + pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); + + workerNodeList = WorkerNodeList(); + + foreach(workerNodeCell, workerNodeList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + uint64 relationSizeOnNode = DistributedTableSizeOnWorker(workerNode, relationId, + sizeQuery); + totalRelationSize += relationSizeOnNode; + } + + heap_close(pgDistNode, NoLock); + + return totalRelationSize; +} + + +/* + * DistributedTableSizeOnWorker gets the workerNode and relationId to calculate + * size of that relation on the given workerNode by summing up the size of each + * shard placement. + */ +static uint64 +DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, char *sizeQuery) +{ + StringInfo tableSizeQuery = NULL; + StringInfo tableSizeStringInfo = NULL; + char *workerNodeName = workerNode->workerName; + uint32 workerNodePort = workerNode->workerPort; + char *tableSizeString; + List *sizeList = NIL; + uint64 tableSize = 0; + + List *shardIntervalsOnNode = ShardIntervalsOnWorkerNode(workerNode, relationId); + + tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(relationId, + shardIntervalsOnNode, + sizeQuery); + + sizeList = ExecuteRemoteQuery(workerNodeName, workerNodePort, NULL, tableSizeQuery); + + if (sizeList == NIL) + { + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("cannot get the size because of a connection error"))); + } + + tableSizeStringInfo = (StringInfo) linitial(sizeList); + tableSizeString = tableSizeStringInfo->data; + tableSize = atol(tableSizeString); + + return tableSize; +} + + +/* + * ShardIntervalsOnNode takes a WorkerNode then compares it with each placement + * of table. It returns shard intervals of table on that node as a list of shard + * intervals. Note that, shard intervals returned as elements of the list are + * not the copies but the pointers. + * + * DO NOT modify the shard intervals returned by this function. + */ +static List * +ShardIntervalsOnWorkerNode(WorkerNode *workerNode, Oid relationId) +{ + DistTableCacheEntry *distTableCacheEntry = DistributedTableCacheEntry(relationId); + char *workerNodeName = workerNode->workerName; + uint32 workerNodePort = workerNode->workerPort; + List *shardIntervalList = NIL; + int shardIndex = 0; + int shardIntervalArrayLength = distTableCacheEntry->shardIntervalArrayLength; + + for (shardIndex = 0; shardIndex < shardIntervalArrayLength; shardIndex++) + { + ShardPlacement *placementArray = + distTableCacheEntry->arrayOfPlacementArrays[shardIndex]; + int numberOfPlacements = + distTableCacheEntry->arrayOfPlacementArrayLengths[shardIndex]; + int placementIndex = 0; + + for (placementIndex = 0; placementIndex < numberOfPlacements; placementIndex++) + { + ShardPlacement *placement = &placementArray[placementIndex]; + char *shardNodeName = placement->nodeName; + uint32 shardNodePort = placement->nodePort; + uint64 shardId = placement->shardId; + bool metadataLock = false; + + metadataLock = TryLockShardDistributionMetadata(shardId, ShareLock); + + /* if the lock is not acquired warn the user */ + if (metadataLock == false) + { + ereport(WARNING, (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("lock is not acquired, size of shard %ld " + "will be ignored", shardId))); + continue; + } + + if (strcmp(shardNodeName, workerNodeName) == 0 && + shardNodePort == workerNodePort) + { + ShardInterval *shardInterval = + distTableCacheEntry->sortedShardIntervalArray[shardIndex]; + shardIntervalList = lappend(shardIntervalList, shardInterval); + } + } + } + + return shardIntervalList; +} + + +/* + * GenerateSizeQueryOnMultiplePlacements generates a select size query to get + * size of multiple tables from the relation with distributedRelationId. Note + * that, different size functions supported by PG are also supported by this + * function changing the size query given as the last parameter to function. + * Format of sizeQuery is pg_*_size(%s). Examples of it can be found in the + * master_protocol.h + */ +static StringInfo +GenerateSizeQueryOnMultiplePlacements(Oid distributedRelationId, List *shardIntervalList, + char *sizeQuery) +{ + Oid schemaId = get_rel_namespace(distributedRelationId); + char *schemaName = get_namespace_name(schemaId); + + StringInfo selectQuery = makeStringInfo(); + ListCell *shardIntervalCell = NULL; + + appendStringInfo(selectQuery, "SELECT "); + + foreach(shardIntervalCell, shardIntervalList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); + uint64 shardId = shardInterval->shardId; + char *shardName = get_rel_name(distributedRelationId); + char *shardQualifiedName = NULL; + char *quotedShardName = NULL; + AppendShardIdToName(&shardName, shardId); + + shardQualifiedName = quote_qualified_identifier(schemaName, shardName); + quotedShardName = quote_literal_cstr(shardQualifiedName); + + appendStringInfo(selectQuery, sizeQuery, quotedShardName); + appendStringInfo(selectQuery, " + "); + } + + /* + * Add 0 as a last size, it handles empty list case and makes size control checks + * unnecessary which would have implemented without this line. + */ + appendStringInfo(selectQuery, "0;"); + + return selectQuery; +} + + +/* + * ErrorIfNotSuitableToGetSize determines whether the table is suitable to find + * its' size with internal functions. + */ +static void +ErrorIfNotSuitableToGetSize(Oid relationId) +{ + if (!IsDistributedTable(relationId)) + { + char *relationName = get_rel_name(relationId); + char *escapedQueryString = quote_literal_cstr(relationName); + ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("cannot calculate the size because relation %s is not " + "distributed", escapedQueryString))); + } + + if (PartitionMethod(relationId) == DISTRIBUTE_BY_HASH && + !SingleReplicatedTable(relationId)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot calculate the size because replication factor " + "is greater than 1"))); + } +} /* diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 013df30ba..9f08abc4a 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -25,6 +25,9 @@ /* total number of hash tokens (2^32) */ #define HASH_TOKEN_COUNT INT64CONST(4294967296) #define SELECT_EXIST_QUERY "SELECT EXISTS (SELECT 1 FROM %s)" +#define PG_TABLE_SIZE_FUNCTION "pg_table_size(%s)" +#define PG_RELATION_SIZE_FUNCTION "pg_relation_size(%s)" +#define PG_TOTAL_RELATION_SIZE_FUNCTION "pg_total_relation_size(%s)" /* In-memory representation of a typed tuple in pg_dist_shard. */ typedef struct ShardInterval diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 4a52854aa..8655907f8 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -76,6 +76,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-15'; ALTER EXTENSION citus UPDATE TO '6.1-16'; ALTER EXTENSION citus UPDATE TO '6.1-17'; ALTER EXTENSION citus UPDATE TO '6.2-1'; +ALTER EXTENSION citus UPDATE TO '6.2-2'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) FROM pg_depend AS pgd, diff --git a/src/test/regress/expected/multi_size_queries.out b/src/test/regress/expected/multi_size_queries.out new file mode 100644 index 000000000..aae4ea52b --- /dev/null +++ b/src/test/regress/expected/multi_size_queries.out @@ -0,0 +1,106 @@ +-- +-- MULTI_SIZE_QUERIES +-- +-- Test checks whether size of distributed tables can be obtained with citus_table_size. +-- To find the relation size and total relation size citus_relation_size and +-- citus_total_relation_size are also tested. +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1390000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1390000; +-- Tests on distributed table with replication factor > 1 +SELECT citus_table_size('lineitem_hash_part'); +ERROR: cannot calculate the size because replication factor is greater than 1 +SELECT citus_relation_size('lineitem_hash_part'); +ERROR: cannot calculate the size because replication factor is greater than 1 +SELECT citus_total_relation_size('lineitem_hash_part'); +ERROR: cannot calculate the size because replication factor is greater than 1 +VACUUM (FULL) customer_copy_hash; +-- Tests on distributed tables with streaming replication. +SELECT citus_table_size('customer_copy_hash'); + citus_table_size +------------------ + 548864 +(1 row) + +SELECT citus_relation_size('customer_copy_hash'); + citus_relation_size +--------------------- + 548864 +(1 row) + +SELECT citus_total_relation_size('customer_copy_hash'); + citus_total_relation_size +--------------------------- + 1597440 +(1 row) + +CREATE INDEX index_1 on customer_copy_hash(c_custkey); +NOTICE: using one-phase commit for distributed DDL commands +HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' +VACUUM (FULL) customer_copy_hash; +-- Tests on distributed table with index. +SELECT citus_table_size('customer_copy_hash'); + citus_table_size +------------------ + 548864 +(1 row) + +SELECT citus_relation_size('customer_copy_hash'); + citus_relation_size +--------------------- + 548864 +(1 row) + +SELECT citus_total_relation_size('customer_copy_hash'); + citus_total_relation_size +--------------------------- + 2646016 +(1 row) + +-- Tests on reference table +VACUUM (FULL) supplier; +SELECT citus_table_size('supplier'); + citus_table_size +------------------ + 376832 +(1 row) + +SELECT citus_relation_size('supplier'); + citus_relation_size +--------------------- + 376832 +(1 row) + +SELECT citus_total_relation_size('supplier'); + citus_total_relation_size +--------------------------- + 376832 +(1 row) + +CREATE INDEX index_2 on supplier(s_suppkey); +VACUUM (FULL) supplier; +SELECT citus_table_size('supplier'); + citus_table_size +------------------ + 376832 +(1 row) + +SELECT citus_relation_size('supplier'); + citus_relation_size +--------------------- + 376832 +(1 row) + +SELECT citus_total_relation_size('supplier'); + citus_total_relation_size +--------------------------- + 458752 +(1 row) + +-- Test inside the transaction +BEGIN; +ALTER TABLE supplier ALTER COLUMN s_suppkey SET NOT NULL; +select citus_table_size('supplier'); +ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications +END; +DROP INDEX index_1; +DROP INDEX index_2; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 39593a977..87cc32444 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -158,6 +158,11 @@ test: multi_router_planner # ---------- test: multi_large_shardid +# ---------- +# multi_size_queries tests various size commands on distributed tables +# ---------- +test: multi_size_queries + # ---------- # multi_drop_extension makes sure we can safely drop and recreate the extension # ---------- diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index da41a523a..d1a1ff559 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -76,6 +76,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-15'; ALTER EXTENSION citus UPDATE TO '6.1-16'; ALTER EXTENSION citus UPDATE TO '6.1-17'; ALTER EXTENSION citus UPDATE TO '6.2-1'; +ALTER EXTENSION citus UPDATE TO '6.2-2'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) diff --git a/src/test/regress/sql/multi_size_queries.sql b/src/test/regress/sql/multi_size_queries.sql new file mode 100644 index 000000000..24afe7997 --- /dev/null +++ b/src/test/regress/sql/multi_size_queries.sql @@ -0,0 +1,52 @@ +-- +-- MULTI_SIZE_QUERIES +-- +-- Test checks whether size of distributed tables can be obtained with citus_table_size. +-- To find the relation size and total relation size citus_relation_size and +-- citus_total_relation_size are also tested. + +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1390000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1390000; + +-- Tests on distributed table with replication factor > 1 +SELECT citus_table_size('lineitem_hash_part'); +SELECT citus_relation_size('lineitem_hash_part'); +SELECT citus_total_relation_size('lineitem_hash_part'); + +VACUUM (FULL) customer_copy_hash; + +-- Tests on distributed tables with streaming replication. +SELECT citus_table_size('customer_copy_hash'); +SELECT citus_relation_size('customer_copy_hash'); +SELECT citus_total_relation_size('customer_copy_hash'); + +CREATE INDEX index_1 on customer_copy_hash(c_custkey); +VACUUM (FULL) customer_copy_hash; + +-- Tests on distributed table with index. +SELECT citus_table_size('customer_copy_hash'); +SELECT citus_relation_size('customer_copy_hash'); +SELECT citus_total_relation_size('customer_copy_hash'); + +-- Tests on reference table +VACUUM (FULL) supplier; + +SELECT citus_table_size('supplier'); +SELECT citus_relation_size('supplier'); +SELECT citus_total_relation_size('supplier'); + +CREATE INDEX index_2 on supplier(s_suppkey); +VACUUM (FULL) supplier; + +SELECT citus_table_size('supplier'); +SELECT citus_relation_size('supplier'); +SELECT citus_total_relation_size('supplier'); + +-- Test inside the transaction +BEGIN; +ALTER TABLE supplier ALTER COLUMN s_suppkey SET NOT NULL; +select citus_table_size('supplier'); +END; + +DROP INDEX index_1; +DROP INDEX index_2;