diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 604e5b5d7..6c8f2c9d5 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -67,6 +67,7 @@ #include "distributed/deparser.h" #include "distributed/distribution_column.h" #include "distributed/listutils.h" +#include "distributed/lock_graph.h" #include "distributed/maintenanced.h" #include "distributed/metadata/dependency.h" #include "distributed/metadata/distobject.h" @@ -79,6 +80,7 @@ #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" #include "distributed/pg_dist_colocation.h" +#include "distributed/pg_dist_database.h" #include "distributed/pg_dist_node.h" #include "distributed/pg_dist_schema.h" #include "distributed/pg_dist_shard.h" @@ -156,6 +158,7 @@ static char * RemoteSchemaIdExpressionByName(char *schemaName); static char * RemoteTypeIdExpression(Oid typeId); static char * RemoteCollationIdExpression(Oid colocationId); static char * RemoteTableIdExpression(Oid relationId); +static int GroupLookupFromDatabase(int64 databaseOid, bool missingOk); PG_FUNCTION_INFO_V1(start_metadata_sync_to_all_nodes); @@ -185,8 +188,9 @@ PG_FUNCTION_INFO_V1(citus_internal_add_tenant_schema); PG_FUNCTION_INFO_V1(citus_internal_delete_tenant_schema); PG_FUNCTION_INFO_V1(citus_internal_update_none_dist_table_metadata); PG_FUNCTION_INFO_V1(citus_internal_database_command); -PG_FUNCTION_INFO_V1(citus_pg_database_size_by_db_name); -PG_FUNCTION_INFO_V1(citus_pg_database_size_by_db_oid); +PG_FUNCTION_INFO_V1(citus_internal_pg_database_size_by_db_name); +PG_FUNCTION_INFO_V1(citus_internal_pg_database_size_by_db_oid); +PG_FUNCTION_INFO_V1(citus_internal_database_size); static bool got_SIGTERM = false; @@ -4052,7 +4056,7 @@ citus_internal_database_command(PG_FUNCTION_ARGS) } Datum -citus_pg_database_size_by_db_name(PG_FUNCTION_ARGS){ +citus_internal_pg_database_size_by_db_name(PG_FUNCTION_ARGS){ CheckCitusVersion(ERROR); PG_ENSURE_ARGNOTNULL(0, "dbName"); @@ -4065,7 +4069,7 @@ citus_pg_database_size_by_db_name(PG_FUNCTION_ARGS){ } Datum -citus_pg_database_size_by_db_oid(PG_FUNCTION_ARGS){ +citus_internal_pg_database_size_by_db_oid(PG_FUNCTION_ARGS){ CheckCitusVersion(ERROR); PG_ENSURE_ARGNOTNULL(0, "dbOid"); @@ -4077,16 +4081,120 @@ citus_pg_database_size_by_db_oid(PG_FUNCTION_ARGS){ } + +static int +GroupLookupFromDatabase(int64 databaseOid, bool missingOk) +{ + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + Form_pg_dist_database databaseForm = NULL; + Relation pgDistDatabase = table_open(PgDistDatabaseRelationId(), AccessShareLock); + int groupId = -1; + + ScanKeyInit(&scanKey[0], Anum_pg_dist_database_databaseid, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(databaseOid)); + + SysScanDesc scanDescriptor = systable_beginscan(pgDistDatabase, + InvalidOid, true, + NULL, scanKeyCount, scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple) && !missingOk) + { + ereport(ERROR, (errmsg("could not find valid entry for database " + UINT64_FORMAT, databaseOid))); + } + + if (!HeapTupleIsValid(heapTuple)) + { + groupId = -2; + } + else + { + databaseForm = (Form_pg_dist_database) GETSTRUCT(heapTuple); + groupId = databaseForm->groupid; + } + + systable_endscan(scanDescriptor); + table_close(pgDistDatabase, NoLock); + + return groupId; +} + Datum citus_internal_database_size(PG_FUNCTION_ARGS){ + + uint32 connectionFlag = 0; + + + PGresult *result = NULL; + bool failOnError = true; + CheckCitusVersion(ERROR); PG_ENSURE_ARGNOTNULL(0, "dbName"); Name dbName = PG_GETARG_NAME(0); + elog(INFO, "citus_internal_database_name: %s", dbName->data); - int64 size = 0; + StringInfo databaseSizeQuery = makeStringInfo(); + appendStringInfo(databaseSizeQuery, + "SELECT citus_internal.pg_database_size_local('%s')", + dbName->data); + + //get database oid + bool missingOk = true; + Oid databaseOid = get_database_oid(dbName->data, missingOk); + elog(INFO, "citus_internal_database_oid: %d", databaseOid); + + //get group id + int groupId = GroupLookupFromDatabase(databaseOid, missingOk); + if (groupId < 0){ + ereport(ERROR, (errmsg("could not find valid entry for database %d ", databaseOid))); + PG_RETURN_INT64(-1); + } + elog(INFO, "group id: %d",groupId); + + WorkerNode *workerNode = LookupNodeForGroup(groupId); + + char *workerNodeName = workerNode->workerName; + uint32 workerNodePort = workerNode->workerPort; + + elog(INFO, "workerNodeName: %s", workerNodeName); + elog(INFO, "workerNodePort: %d", workerNodePort); + + if (groupId == GetLocalGroupId()){ + //local database + elog(INFO, "local database"); + PG_RETURN_INT64(DirectFunctionCall1(citus_internal_pg_database_size_by_db_name, NameGetDatum(dbName))); + } + else{ + elog(INFO, "remote database"); + //remote database + MultiConnection *connection = GetNodeConnection(connectionFlag, workerNodeName, + workerNodePort); + int queryResult = ExecuteOptionalRemoteCommand(connection, databaseSizeQuery->data, + &result); + int64 size = 0; + + if (queryResult != 0) + { + ereport(WARNING, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to %s:%d to get size of " + "database \"%s\"", + workerNodeName, workerNodePort, + dbName->data))); + } + else{ + + size = ParseIntField(result, 0, 0); + + + } + PQclear(result); + ClearResults(connection, failOnError); + PG_RETURN_INT64(size); + } - PG_RETURN_INT64(size); } diff --git a/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql b/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql index 63c4a527f..960c45a56 100644 --- a/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql +++ b/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql @@ -3,3 +3,15 @@ #include "udfs/citus_internal_database_command/12.2-1.sql" #include "udfs/citus_add_rebalance_strategy/12.2-1.sql" + +CREATE TABLE citus.pg_dist_database ( + databaseid oid NOT NULL, + groupid integer NOT NULL, + CONSTRAINT pg_dist_database_pkey PRIMARY KEY (databaseid) +); + +ALTER TABLE citus.pg_dist_database SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.pg_dist_database TO public; + +#include "udfs/citus_internal_pg_database_size_local/12.2-1.sql" +-- #include "udfs/citus_pg_dist_database/12.2-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql b/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql index 44ac8d447..a21eda9bf 100644 --- a/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql @@ -1,7 +1,12 @@ -- citus--12.2-1--12.1-1 DROP FUNCTION pg_catalog.citus_internal_database_command(text); -DROP FUNCTION pg_catalog.pg_database_size_local(name); -DROP FUNCTION pg_catalog.pg_database_size_local(oid); + #include "../udfs/citus_add_rebalance_strategy/10.1-1.sql" + + +DROP FUNCTION citus_internal.pg_database_size_local(name); +DROP FUNCTION citus_internal.pg_database_size_local(oid); + +DROP FUNCTION pg_catalog.pg_dist_database_size(name); diff --git a/src/backend/distributed/sql/udfs/citus_internal_database_command/12.2-1.sql b/src/backend/distributed/sql/udfs/citus_internal_database_command/12.2-1.sql index 3e44c3455..9f6d873cc 100644 --- a/src/backend/distributed/sql/udfs/citus_internal_database_command/12.2-1.sql +++ b/src/backend/distributed/sql/udfs/citus_internal_database_command/12.2-1.sql @@ -8,19 +8,3 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_database_command(command te AS 'MODULE_PATHNAME', $$citus_internal_database_command$$; COMMENT ON FUNCTION pg_catalog.citus_internal_database_command(text) IS 'run a database command without transaction block restrictions'; - -CREATE OR REPLACE FUNCTION pg_catalog.pg_database_size_local(db_name name) - RETURNS bigint - LANGUAGE C - VOLATILE -AS 'MODULE_PATHNAME', $$citus_pg_database_size_by_db_name$$; -COMMENT ON FUNCTION pg_catalog.pg_database_size_local(name) IS - 'calculates the size of a database in bytes by its name in a multi-node cluster'; - - CREATE OR REPLACE FUNCTION pg_catalog.pg_database_size_local(db_oid oid) - RETURNS bigint - LANGUAGE C - VOLATILE -AS 'MODULE_PATHNAME', $$citus_pg_database_size_by_db_oid$$; -COMMENT ON FUNCTION pg_catalog.pg_database_size_local(oid) IS - 'calculates the size of a database in bytes by its oid in a multi-node cluster'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_database_command/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_database_command/latest.sql index bd333191e..3537b2bea 100644 --- a/src/backend/distributed/sql/udfs/citus_internal_database_command/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_internal_database_command/latest.sql @@ -9,18 +9,3 @@ AS 'MODULE_PATHNAME', $$citus_internal_database_command$$; COMMENT ON FUNCTION pg_catalog.citus_internal_database_command(text) IS 'run a database command without transaction block restrictions'; - CREATE OR REPLACE FUNCTION pg_catalog.pg_database_size_local(db_name name) - RETURNS bigint - LANGUAGE C - VOLATILE -AS 'MODULE_PATHNAME', $$citus_pg_database_size_by_db_name$$; -COMMENT ON FUNCTION pg_catalog.pg_database_size_local(name) IS - 'calculates the size of a database in bytes by its name in a multi-node cluster'; - - CREATE OR REPLACE FUNCTION pg_catalog.pg_database_size_local(db_oid oid) - RETURNS bigint - LANGUAGE C - VOLATILE -AS 'MODULE_PATHNAME', $$citus_pg_database_size_by_db_oid$$; -COMMENT ON FUNCTION pg_catalog.pg_database_size_local(oid) IS - 'calculates the size of a database in bytes by its oid in a multi-node cluster'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_pg_database_size_local/12.2-1.sql b/src/backend/distributed/sql/udfs/citus_internal_pg_database_size_local/12.2-1.sql new file mode 100644 index 000000000..d511592aa --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_pg_database_size_local/12.2-1.sql @@ -0,0 +1,15 @@ +CREATE OR REPLACE FUNCTION citus_internal.pg_database_size_local(db_name name) + RETURNS bigint + LANGUAGE C + VOLATILE +AS 'MODULE_PATHNAME', $$citus_internal_pg_database_size_by_db_name$$; +COMMENT ON FUNCTION citus_internal.pg_database_size_local(name) IS + 'calculates the size of a database in bytes by its name in a multi-node cluster'; + +CREATE OR REPLACE FUNCTION citus_internal.pg_database_size_local(db_oid oid) + RETURNS bigint + LANGUAGE C + VOLATILE +AS 'MODULE_PATHNAME', $$citus_internal_pg_database_size_by_db_oid$$; +COMMENT ON FUNCTION citus_internal.pg_database_size_local(oid) IS + 'calculates the size of a database in bytes by its oid in a multi-node cluster'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_pg_database_size_local/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_pg_database_size_local/latest.sql new file mode 100644 index 000000000..c30791290 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_pg_database_size_local/latest.sql @@ -0,0 +1,15 @@ +CREATE OR REPLACE FUNCTION citus_internal.pg_database_size_local(db_name name) + RETURNS bigint + LANGUAGE C + VOLATILE +AS 'MODULE_PATHNAME', $$citus_internal_pg_database_size_by_db_name$$; +COMMENT ON FUNCTION citus_internal.pg_database_size_local(name) IS + 'calculates the size of a database in bytes by its name in a multi-node cluster'; + + CREATE OR REPLACE FUNCTION citus_internal.pg_database_size_local(db_oid oid) + RETURNS bigint + LANGUAGE C + VOLATILE +AS 'MODULE_PATHNAME', $$citus_internal_pg_database_size_by_db_oid$$; +COMMENT ON FUNCTION citus_internal.pg_database_size_local(oid) IS + 'calculates the size of a database in bytes by its oid in a multi-node cluster'; diff --git a/src/backend/distributed/sql/udfs/citus_pg_dist_database/12.2-1.sql b/src/backend/distributed/sql/udfs/citus_pg_dist_database/12.2-1.sql new file mode 100644 index 000000000..ff56783fa --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_pg_dist_database/12.2-1.sql @@ -0,0 +1,7 @@ + CREATE OR REPLACE FUNCTION pg_catalog.pg_dist_database_size(db_name name) + RETURNS bigint + LANGUAGE C + VOLATILE +AS 'MODULE_PATHNAME', $$citus_internal_database_size$$; +COMMENT ON FUNCTION pg_catalog.pg_dist_database_size(oid) IS + 'calculates the size of a database in bytes by its name in a multi-node cluster'; diff --git a/src/backend/distributed/sql/udfs/citus_pg_dist_database/latest.sql b/src/backend/distributed/sql/udfs/citus_pg_dist_database/latest.sql new file mode 100644 index 000000000..ff56783fa --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_pg_dist_database/latest.sql @@ -0,0 +1,7 @@ + CREATE OR REPLACE FUNCTION pg_catalog.pg_dist_database_size(db_name name) + RETURNS bigint + LANGUAGE C + VOLATILE +AS 'MODULE_PATHNAME', $$citus_internal_database_size$$; +COMMENT ON FUNCTION pg_catalog.pg_dist_database_size(oid) IS + 'calculates the size of a database in bytes by its name in a multi-node cluster'; diff --git a/src/include/distributed/pg_dist_database.h b/src/include/distributed/pg_dist_database.h new file mode 100644 index 000000000..185e8605b --- /dev/null +++ b/src/include/distributed/pg_dist_database.h @@ -0,0 +1,33 @@ +/*------------------------------------------------------------------------- + * + * pg_dist_database.h + * definition of the relation that holds distributed database information + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef CITUS_PG_DIST_DATABASE_H +#define CITUS_PG_DIST_DATABASE_H + + + +typedef struct FormData_pg_dist_database +{ + Oid databaseid; + int groupid; +} FormData_pg_dist_database; + +#define Anum_pg_dist_database_databaseid 1 +#define Anum_pg_dist_database_groupid 2 + +/* ---------------- + * Form_pg_dist_database corresponds to a pointer to a tuple with + * the format of pg_dist_database relation. + * ---------------- + */ +typedef FormData_pg_dist_database *Form_pg_dist_database; +#endif /* CITUS_PG_DIST_DATABASE_H */ + +#define PgDistDatabaseRelationId() (get_relname_relid("pg_dist_database", PG_CATALOG_NAMESPACE))