mirror of https://github.com/citusdata/citus.git
Adds pg_dist_database_size
parent
ae97ac591a
commit
24264f2e42
|
@ -67,6 +67,7 @@
|
||||||
#include "distributed/deparser.h"
|
#include "distributed/deparser.h"
|
||||||
#include "distributed/distribution_column.h"
|
#include "distributed/distribution_column.h"
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/listutils.h"
|
||||||
|
#include "distributed/lock_graph.h"
|
||||||
#include "distributed/maintenanced.h"
|
#include "distributed/maintenanced.h"
|
||||||
#include "distributed/metadata/dependency.h"
|
#include "distributed/metadata/dependency.h"
|
||||||
#include "distributed/metadata/distobject.h"
|
#include "distributed/metadata/distobject.h"
|
||||||
|
@ -79,6 +80,7 @@
|
||||||
#include "distributed/multi_partitioning_utils.h"
|
#include "distributed/multi_partitioning_utils.h"
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
#include "distributed/pg_dist_colocation.h"
|
#include "distributed/pg_dist_colocation.h"
|
||||||
|
#include "distributed/pg_dist_database.h"
|
||||||
#include "distributed/pg_dist_node.h"
|
#include "distributed/pg_dist_node.h"
|
||||||
#include "distributed/pg_dist_schema.h"
|
#include "distributed/pg_dist_schema.h"
|
||||||
#include "distributed/pg_dist_shard.h"
|
#include "distributed/pg_dist_shard.h"
|
||||||
|
@ -156,6 +158,7 @@ static char * RemoteSchemaIdExpressionByName(char *schemaName);
|
||||||
static char * RemoteTypeIdExpression(Oid typeId);
|
static char * RemoteTypeIdExpression(Oid typeId);
|
||||||
static char * RemoteCollationIdExpression(Oid colocationId);
|
static char * RemoteCollationIdExpression(Oid colocationId);
|
||||||
static char * RemoteTableIdExpression(Oid relationId);
|
static char * RemoteTableIdExpression(Oid relationId);
|
||||||
|
static int GroupLookupFromDatabase(int64 databaseOid, bool missingOk);
|
||||||
|
|
||||||
|
|
||||||
PG_FUNCTION_INFO_V1(start_metadata_sync_to_all_nodes);
|
PG_FUNCTION_INFO_V1(start_metadata_sync_to_all_nodes);
|
||||||
|
@ -185,8 +188,9 @@ PG_FUNCTION_INFO_V1(citus_internal_add_tenant_schema);
|
||||||
PG_FUNCTION_INFO_V1(citus_internal_delete_tenant_schema);
|
PG_FUNCTION_INFO_V1(citus_internal_delete_tenant_schema);
|
||||||
PG_FUNCTION_INFO_V1(citus_internal_update_none_dist_table_metadata);
|
PG_FUNCTION_INFO_V1(citus_internal_update_none_dist_table_metadata);
|
||||||
PG_FUNCTION_INFO_V1(citus_internal_database_command);
|
PG_FUNCTION_INFO_V1(citus_internal_database_command);
|
||||||
PG_FUNCTION_INFO_V1(citus_pg_database_size_by_db_name);
|
PG_FUNCTION_INFO_V1(citus_internal_pg_database_size_by_db_name);
|
||||||
PG_FUNCTION_INFO_V1(citus_pg_database_size_by_db_oid);
|
PG_FUNCTION_INFO_V1(citus_internal_pg_database_size_by_db_oid);
|
||||||
|
PG_FUNCTION_INFO_V1(citus_internal_database_size);
|
||||||
|
|
||||||
|
|
||||||
static bool got_SIGTERM = false;
|
static bool got_SIGTERM = false;
|
||||||
|
@ -4052,7 +4056,7 @@ citus_internal_database_command(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
Datum
|
Datum
|
||||||
citus_pg_database_size_by_db_name(PG_FUNCTION_ARGS){
|
citus_internal_pg_database_size_by_db_name(PG_FUNCTION_ARGS){
|
||||||
CheckCitusVersion(ERROR);
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
PG_ENSURE_ARGNOTNULL(0, "dbName");
|
PG_ENSURE_ARGNOTNULL(0, "dbName");
|
||||||
|
@ -4065,7 +4069,7 @@ citus_pg_database_size_by_db_name(PG_FUNCTION_ARGS){
|
||||||
}
|
}
|
||||||
|
|
||||||
Datum
|
Datum
|
||||||
citus_pg_database_size_by_db_oid(PG_FUNCTION_ARGS){
|
citus_internal_pg_database_size_by_db_oid(PG_FUNCTION_ARGS){
|
||||||
CheckCitusVersion(ERROR);
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
PG_ENSURE_ARGNOTNULL(0, "dbOid");
|
PG_ENSURE_ARGNOTNULL(0, "dbOid");
|
||||||
|
@ -4077,18 +4081,122 @@ citus_pg_database_size_by_db_oid(PG_FUNCTION_ARGS){
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int
|
||||||
|
GroupLookupFromDatabase(int64 databaseOid, bool missingOk)
|
||||||
|
{
|
||||||
|
ScanKeyData scanKey[1];
|
||||||
|
int scanKeyCount = 1;
|
||||||
|
Form_pg_dist_database databaseForm = NULL;
|
||||||
|
Relation pgDistDatabase = table_open(PgDistDatabaseRelationId(), AccessShareLock);
|
||||||
|
int groupId = -1;
|
||||||
|
|
||||||
|
ScanKeyInit(&scanKey[0], Anum_pg_dist_database_databaseid,
|
||||||
|
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(databaseOid));
|
||||||
|
|
||||||
|
SysScanDesc scanDescriptor = systable_beginscan(pgDistDatabase,
|
||||||
|
InvalidOid, true,
|
||||||
|
NULL, scanKeyCount, scanKey);
|
||||||
|
|
||||||
|
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
||||||
|
if (!HeapTupleIsValid(heapTuple) && !missingOk)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("could not find valid entry for database "
|
||||||
|
UINT64_FORMAT, databaseOid)));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!HeapTupleIsValid(heapTuple))
|
||||||
|
{
|
||||||
|
groupId = -2;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
databaseForm = (Form_pg_dist_database) GETSTRUCT(heapTuple);
|
||||||
|
groupId = databaseForm->groupid;
|
||||||
|
}
|
||||||
|
|
||||||
|
systable_endscan(scanDescriptor);
|
||||||
|
table_close(pgDistDatabase, NoLock);
|
||||||
|
|
||||||
|
return groupId;
|
||||||
|
}
|
||||||
|
|
||||||
Datum citus_internal_database_size(PG_FUNCTION_ARGS){
|
Datum citus_internal_database_size(PG_FUNCTION_ARGS){
|
||||||
|
|
||||||
|
uint32 connectionFlag = 0;
|
||||||
|
|
||||||
|
|
||||||
|
PGresult *result = NULL;
|
||||||
|
bool failOnError = true;
|
||||||
|
|
||||||
CheckCitusVersion(ERROR);
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
PG_ENSURE_ARGNOTNULL(0, "dbName");
|
PG_ENSURE_ARGNOTNULL(0, "dbName");
|
||||||
|
|
||||||
Name dbName = PG_GETARG_NAME(0);
|
Name dbName = PG_GETARG_NAME(0);
|
||||||
|
elog(INFO, "citus_internal_database_name: %s", dbName->data);
|
||||||
|
|
||||||
|
StringInfo databaseSizeQuery = makeStringInfo();
|
||||||
|
appendStringInfo(databaseSizeQuery,
|
||||||
|
"SELECT citus_internal.pg_database_size_local('%s')",
|
||||||
|
dbName->data);
|
||||||
|
|
||||||
|
//get database oid
|
||||||
|
bool missingOk = true;
|
||||||
|
Oid databaseOid = get_database_oid(dbName->data, missingOk);
|
||||||
|
elog(INFO, "citus_internal_database_oid: %d", databaseOid);
|
||||||
|
|
||||||
|
//get group id
|
||||||
|
int groupId = GroupLookupFromDatabase(databaseOid, missingOk);
|
||||||
|
if (groupId < 0){
|
||||||
|
ereport(ERROR, (errmsg("could not find valid entry for database %d ", databaseOid)));
|
||||||
|
PG_RETURN_INT64(-1);
|
||||||
|
}
|
||||||
|
elog(INFO, "group id: %d",groupId);
|
||||||
|
|
||||||
|
WorkerNode *workerNode = LookupNodeForGroup(groupId);
|
||||||
|
|
||||||
|
char *workerNodeName = workerNode->workerName;
|
||||||
|
uint32 workerNodePort = workerNode->workerPort;
|
||||||
|
|
||||||
|
elog(INFO, "workerNodeName: %s", workerNodeName);
|
||||||
|
elog(INFO, "workerNodePort: %d", workerNodePort);
|
||||||
|
|
||||||
|
if (groupId == GetLocalGroupId()){
|
||||||
|
//local database
|
||||||
|
elog(INFO, "local database");
|
||||||
|
PG_RETURN_INT64(DirectFunctionCall1(citus_internal_pg_database_size_by_db_name, NameGetDatum(dbName)));
|
||||||
|
}
|
||||||
|
else{
|
||||||
|
elog(INFO, "remote database");
|
||||||
|
//remote database
|
||||||
|
MultiConnection *connection = GetNodeConnection(connectionFlag, workerNodeName,
|
||||||
|
workerNodePort);
|
||||||
|
int queryResult = ExecuteOptionalRemoteCommand(connection, databaseSizeQuery->data,
|
||||||
|
&result);
|
||||||
int64 size = 0;
|
int64 size = 0;
|
||||||
|
|
||||||
|
if (queryResult != 0)
|
||||||
|
{
|
||||||
|
ereport(WARNING, (errcode(ERRCODE_CONNECTION_FAILURE),
|
||||||
|
errmsg("could not connect to %s:%d to get size of "
|
||||||
|
"database \"%s\"",
|
||||||
|
workerNodeName, workerNodePort,
|
||||||
|
dbName->data)));
|
||||||
|
}
|
||||||
|
else{
|
||||||
|
|
||||||
|
size = ParseIntField(result, 0, 0);
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
PQclear(result);
|
||||||
|
ClearResults(connection, failOnError);
|
||||||
PG_RETURN_INT64(size);
|
PG_RETURN_INT64(size);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SyncNewColocationGroup synchronizes a new pg_dist_colocation entry to a worker.
|
* SyncNewColocationGroup synchronizes a new pg_dist_colocation entry to a worker.
|
||||||
|
|
|
@ -3,3 +3,15 @@
|
||||||
|
|
||||||
#include "udfs/citus_internal_database_command/12.2-1.sql"
|
#include "udfs/citus_internal_database_command/12.2-1.sql"
|
||||||
#include "udfs/citus_add_rebalance_strategy/12.2-1.sql"
|
#include "udfs/citus_add_rebalance_strategy/12.2-1.sql"
|
||||||
|
|
||||||
|
CREATE TABLE citus.pg_dist_database (
|
||||||
|
databaseid oid NOT NULL,
|
||||||
|
groupid integer NOT NULL,
|
||||||
|
CONSTRAINT pg_dist_database_pkey PRIMARY KEY (databaseid)
|
||||||
|
);
|
||||||
|
|
||||||
|
ALTER TABLE citus.pg_dist_database SET SCHEMA pg_catalog;
|
||||||
|
GRANT SELECT ON pg_catalog.pg_dist_database TO public;
|
||||||
|
|
||||||
|
#include "udfs/citus_internal_pg_database_size_local/12.2-1.sql"
|
||||||
|
-- #include "udfs/citus_pg_dist_database/12.2-1.sql"
|
||||||
|
|
|
@ -1,7 +1,12 @@
|
||||||
-- citus--12.2-1--12.1-1
|
-- citus--12.2-1--12.1-1
|
||||||
|
|
||||||
DROP FUNCTION pg_catalog.citus_internal_database_command(text);
|
DROP FUNCTION pg_catalog.citus_internal_database_command(text);
|
||||||
DROP FUNCTION pg_catalog.pg_database_size_local(name);
|
|
||||||
DROP FUNCTION pg_catalog.pg_database_size_local(oid);
|
|
||||||
|
|
||||||
#include "../udfs/citus_add_rebalance_strategy/10.1-1.sql"
|
#include "../udfs/citus_add_rebalance_strategy/10.1-1.sql"
|
||||||
|
|
||||||
|
|
||||||
|
DROP FUNCTION citus_internal.pg_database_size_local(name);
|
||||||
|
DROP FUNCTION citus_internal.pg_database_size_local(oid);
|
||||||
|
|
||||||
|
DROP FUNCTION pg_catalog.pg_dist_database_size(name);
|
||||||
|
|
|
@ -8,19 +8,3 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_database_command(command te
|
||||||
AS 'MODULE_PATHNAME', $$citus_internal_database_command$$;
|
AS 'MODULE_PATHNAME', $$citus_internal_database_command$$;
|
||||||
COMMENT ON FUNCTION pg_catalog.citus_internal_database_command(text) IS
|
COMMENT ON FUNCTION pg_catalog.citus_internal_database_command(text) IS
|
||||||
'run a database command without transaction block restrictions';
|
'run a database command without transaction block restrictions';
|
||||||
|
|
||||||
CREATE OR REPLACE FUNCTION pg_catalog.pg_database_size_local(db_name name)
|
|
||||||
RETURNS bigint
|
|
||||||
LANGUAGE C
|
|
||||||
VOLATILE
|
|
||||||
AS 'MODULE_PATHNAME', $$citus_pg_database_size_by_db_name$$;
|
|
||||||
COMMENT ON FUNCTION pg_catalog.pg_database_size_local(name) IS
|
|
||||||
'calculates the size of a database in bytes by its name in a multi-node cluster';
|
|
||||||
|
|
||||||
CREATE OR REPLACE FUNCTION pg_catalog.pg_database_size_local(db_oid oid)
|
|
||||||
RETURNS bigint
|
|
||||||
LANGUAGE C
|
|
||||||
VOLATILE
|
|
||||||
AS 'MODULE_PATHNAME', $$citus_pg_database_size_by_db_oid$$;
|
|
||||||
COMMENT ON FUNCTION pg_catalog.pg_database_size_local(oid) IS
|
|
||||||
'calculates the size of a database in bytes by its oid in a multi-node cluster';
|
|
||||||
|
|
|
@ -9,18 +9,3 @@ AS 'MODULE_PATHNAME', $$citus_internal_database_command$$;
|
||||||
COMMENT ON FUNCTION pg_catalog.citus_internal_database_command(text) IS
|
COMMENT ON FUNCTION pg_catalog.citus_internal_database_command(text) IS
|
||||||
'run a database command without transaction block restrictions';
|
'run a database command without transaction block restrictions';
|
||||||
|
|
||||||
CREATE OR REPLACE FUNCTION pg_catalog.pg_database_size_local(db_name name)
|
|
||||||
RETURNS bigint
|
|
||||||
LANGUAGE C
|
|
||||||
VOLATILE
|
|
||||||
AS 'MODULE_PATHNAME', $$citus_pg_database_size_by_db_name$$;
|
|
||||||
COMMENT ON FUNCTION pg_catalog.pg_database_size_local(name) IS
|
|
||||||
'calculates the size of a database in bytes by its name in a multi-node cluster';
|
|
||||||
|
|
||||||
CREATE OR REPLACE FUNCTION pg_catalog.pg_database_size_local(db_oid oid)
|
|
||||||
RETURNS bigint
|
|
||||||
LANGUAGE C
|
|
||||||
VOLATILE
|
|
||||||
AS 'MODULE_PATHNAME', $$citus_pg_database_size_by_db_oid$$;
|
|
||||||
COMMENT ON FUNCTION pg_catalog.pg_database_size_local(oid) IS
|
|
||||||
'calculates the size of a database in bytes by its oid in a multi-node cluster';
|
|
||||||
|
|
15
src/backend/distributed/sql/udfs/citus_internal_pg_database_size_local/12.2-1.sql
generated
Normal file
15
src/backend/distributed/sql/udfs/citus_internal_pg_database_size_local/12.2-1.sql
generated
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
CREATE OR REPLACE FUNCTION citus_internal.pg_database_size_local(db_name name)
|
||||||
|
RETURNS bigint
|
||||||
|
LANGUAGE C
|
||||||
|
VOLATILE
|
||||||
|
AS 'MODULE_PATHNAME', $$citus_internal_pg_database_size_by_db_name$$;
|
||||||
|
COMMENT ON FUNCTION citus_internal.pg_database_size_local(name) IS
|
||||||
|
'calculates the size of a database in bytes by its name in a multi-node cluster';
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION citus_internal.pg_database_size_local(db_oid oid)
|
||||||
|
RETURNS bigint
|
||||||
|
LANGUAGE C
|
||||||
|
VOLATILE
|
||||||
|
AS 'MODULE_PATHNAME', $$citus_internal_pg_database_size_by_db_oid$$;
|
||||||
|
COMMENT ON FUNCTION citus_internal.pg_database_size_local(oid) IS
|
||||||
|
'calculates the size of a database in bytes by its oid in a multi-node cluster';
|
|
@ -0,0 +1,15 @@
|
||||||
|
CREATE OR REPLACE FUNCTION citus_internal.pg_database_size_local(db_name name)
|
||||||
|
RETURNS bigint
|
||||||
|
LANGUAGE C
|
||||||
|
VOLATILE
|
||||||
|
AS 'MODULE_PATHNAME', $$citus_internal_pg_database_size_by_db_name$$;
|
||||||
|
COMMENT ON FUNCTION citus_internal.pg_database_size_local(name) IS
|
||||||
|
'calculates the size of a database in bytes by its name in a multi-node cluster';
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION citus_internal.pg_database_size_local(db_oid oid)
|
||||||
|
RETURNS bigint
|
||||||
|
LANGUAGE C
|
||||||
|
VOLATILE
|
||||||
|
AS 'MODULE_PATHNAME', $$citus_internal_pg_database_size_by_db_oid$$;
|
||||||
|
COMMENT ON FUNCTION citus_internal.pg_database_size_local(oid) IS
|
||||||
|
'calculates the size of a database in bytes by its oid in a multi-node cluster';
|
|
@ -0,0 +1,7 @@
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.pg_dist_database_size(db_name name)
|
||||||
|
RETURNS bigint
|
||||||
|
LANGUAGE C
|
||||||
|
VOLATILE
|
||||||
|
AS 'MODULE_PATHNAME', $$citus_internal_database_size$$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.pg_dist_database_size(oid) IS
|
||||||
|
'calculates the size of a database in bytes by its name in a multi-node cluster';
|
|
@ -0,0 +1,7 @@
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.pg_dist_database_size(db_name name)
|
||||||
|
RETURNS bigint
|
||||||
|
LANGUAGE C
|
||||||
|
VOLATILE
|
||||||
|
AS 'MODULE_PATHNAME', $$citus_internal_database_size$$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.pg_dist_database_size(oid) IS
|
||||||
|
'calculates the size of a database in bytes by its name in a multi-node cluster';
|
|
@ -0,0 +1,33 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* pg_dist_database.h
|
||||||
|
* definition of the relation that holds distributed database information
|
||||||
|
*
|
||||||
|
* Copyright (c) Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef CITUS_PG_DIST_DATABASE_H
|
||||||
|
#define CITUS_PG_DIST_DATABASE_H
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct FormData_pg_dist_database
|
||||||
|
{
|
||||||
|
Oid databaseid;
|
||||||
|
int groupid;
|
||||||
|
} FormData_pg_dist_database;
|
||||||
|
|
||||||
|
#define Anum_pg_dist_database_databaseid 1
|
||||||
|
#define Anum_pg_dist_database_groupid 2
|
||||||
|
|
||||||
|
/* ----------------
|
||||||
|
* Form_pg_dist_database corresponds to a pointer to a tuple with
|
||||||
|
* the format of pg_dist_database relation.
|
||||||
|
* ----------------
|
||||||
|
*/
|
||||||
|
typedef FormData_pg_dist_database *Form_pg_dist_database;
|
||||||
|
#endif /* CITUS_PG_DIST_DATABASE_H */
|
||||||
|
|
||||||
|
#define PgDistDatabaseRelationId() (get_relname_relid("pg_dist_database", PG_CATALOG_NAMESPACE))
|
Loading…
Reference in New Issue