From f43793ed2ff2eb5c974867f77a4862a720fa4bbf Mon Sep 17 00:00:00 2001 From: gindibay Date: Tue, 12 Dec 2023 22:35:39 +0300 Subject: [PATCH] Reorganizes the code --- .../distributed/commands/database_udf.c | 170 ++++++++++++++++++ .../distributed/metadata/metadata_sync.c | 156 ---------------- src/include/distributed/pg_dist_database.h | 5 +- 3 files changed, 174 insertions(+), 157 deletions(-) create mode 100644 src/backend/distributed/commands/database_udf.c diff --git a/src/backend/distributed/commands/database_udf.c b/src/backend/distributed/commands/database_udf.c new file mode 100644 index 000000000..e94fca791 --- /dev/null +++ b/src/backend/distributed/commands/database_udf.c @@ -0,0 +1,170 @@ +#include "postgres.h" + + +#include "fmgr.h" +#include "utils/fmgroids.h" +#include "utils/fmgrprotos.h" + + +#include "access/genam.h" +#include "commands/dbcommands.h" +#include "distributed/argutils.h" +#include "distributed/connection_management.h" +#include "distributed/lock_graph.h" +#include "distributed/metadata_cache.h" +#include "distributed/pg_dist_database.h" +#include "distributed/remote_commands.h" + + +static int GroupLookupFromDatabase(int64 databaseOid, bool missingOk); +PG_FUNCTION_INFO_V1(citus_internal_pg_database_size_by_db_name); +PG_FUNCTION_INFO_V1(citus_internal_pg_database_size_by_db_oid); +PG_FUNCTION_INFO_V1(citus_internal_database_size); + +Datum +citus_internal_pg_database_size_by_db_name(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + PG_ENSURE_ARGNOTNULL(0, "dbName"); + + Name dbName = PG_GETARG_NAME(0); + Datum size = DirectFunctionCall1(pg_database_size_name, NameGetDatum(dbName)); + + PG_RETURN_DATUM(size); +} + + +Datum +citus_internal_pg_database_size_by_db_oid(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + PG_ENSURE_ARGNOTNULL(0, "dbOid"); + + Oid dbOid = PG_GETARG_OID(0); + Datum size = DirectFunctionCall1(pg_database_size_oid, ObjectIdGetDatum(dbOid)); + + PG_RETURN_DATUM(size); +} + + +static int +GroupLookupFromDatabase(int64 databaseOid, bool missingOk) +{ + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + Form_pg_dist_database databaseForm = NULL; + Relation pgDistDatabase = table_open(PgDistDatabaseRelationId(), AccessShareLock); + int groupId = -1; + + ScanKeyInit(&scanKey[0], Anum_pg_dist_database_databaseid, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(databaseOid)); + + SysScanDesc scanDescriptor = systable_beginscan(pgDistDatabase, + InvalidOid, true, + NULL, scanKeyCount, scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple) && !missingOk) + { + ereport(ERROR, (errmsg("could not find valid entry for database " + UINT64_FORMAT, databaseOid))); + } + + if (!HeapTupleIsValid(heapTuple)) + { + groupId = -2; + } + else + { + databaseForm = (Form_pg_dist_database) GETSTRUCT(heapTuple); + groupId = databaseForm->groupid; + } + + systable_endscan(scanDescriptor); + table_close(pgDistDatabase, NoLock); + + return groupId; +} + + +Datum +citus_internal_database_size(PG_FUNCTION_ARGS) +{ + uint32 connectionFlag = 0; + + + PGresult *result = NULL; + bool failOnError = true; + + CheckCitusVersion(ERROR); + + PG_ENSURE_ARGNOTNULL(0, "dbName"); + + Name dbName = PG_GETARG_NAME(0); + elog(INFO, "citus_internal_database_name: %s", dbName->data); + + StringInfo databaseSizeQuery = makeStringInfo(); + appendStringInfo(databaseSizeQuery, + "SELECT citus_internal.pg_database_size_local('%s')", + dbName->data); + + /*get database oid */ + bool missingOk = true; + Oid databaseOid = get_database_oid(dbName->data, missingOk); + elog(INFO, "citus_internal_database_oid: %d", databaseOid); + + /*get group id */ + int groupId = GroupLookupFromDatabase(databaseOid, missingOk); + if (groupId < 0) + { + ereport(ERROR, (errmsg("could not find valid entry for database %d ", + databaseOid))); + PG_RETURN_INT64(-1); + } + elog(INFO, "group id: %d", groupId); + + WorkerNode *workerNode = LookupNodeForGroup(groupId); + + char *workerNodeName = workerNode->workerName; + uint32 workerNodePort = workerNode->workerPort; + + elog(INFO, "workerNodeName: %s", workerNodeName); + elog(INFO, "workerNodePort: %d", workerNodePort); + + if (groupId == GetLocalGroupId()) + { + /*local database */ + elog(INFO, "local database"); + PG_RETURN_INT64(DirectFunctionCall1(citus_internal_pg_database_size_by_db_name, + NameGetDatum(dbName))); + } + else + { + elog(INFO, "remote database"); + /*remote database */ + MultiConnection *connection = GetNodeConnection(connectionFlag, workerNodeName, + workerNodePort); + int queryResult = ExecuteOptionalRemoteCommand(connection, + databaseSizeQuery->data, + &result); + int64 size = 0; + + if (queryResult != 0) + { + ereport(WARNING, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to %s:%d to get size of " + "database \"%s\"", + workerNodeName, workerNodePort, + dbName->data))); + } + else + { + size = ParseIntField(result, 0, 0); + } + PQclear(result); + ClearResults(connection, failOnError); + PG_RETURN_INT64(size); + } +} diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index b2afd4f97..f0be1995b 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -17,7 +17,6 @@ #include "postgres.h" -#include "fmgr.h" #include "miscadmin.h" #include "pgstat.h" @@ -67,7 +66,6 @@ #include "distributed/deparser.h" #include "distributed/distribution_column.h" #include "distributed/listutils.h" -#include "distributed/lock_graph.h" #include "distributed/maintenanced.h" #include "distributed/metadata/dependency.h" #include "distributed/metadata/distobject.h" @@ -80,7 +78,6 @@ #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" #include "distributed/pg_dist_colocation.h" -#include "distributed/pg_dist_database.h" #include "distributed/pg_dist_node.h" #include "distributed/pg_dist_schema.h" #include "distributed/pg_dist_shard.h" @@ -158,7 +155,6 @@ static char * RemoteSchemaIdExpressionByName(char *schemaName); static char * RemoteTypeIdExpression(Oid typeId); static char * RemoteCollationIdExpression(Oid colocationId); static char * RemoteTableIdExpression(Oid relationId); -static int GroupLookupFromDatabase(int64 databaseOid, bool missingOk); PG_FUNCTION_INFO_V1(start_metadata_sync_to_all_nodes); @@ -188,9 +184,6 @@ PG_FUNCTION_INFO_V1(citus_internal_add_tenant_schema); PG_FUNCTION_INFO_V1(citus_internal_delete_tenant_schema); PG_FUNCTION_INFO_V1(citus_internal_update_none_dist_table_metadata); PG_FUNCTION_INFO_V1(citus_internal_database_command); -PG_FUNCTION_INFO_V1(citus_internal_pg_database_size_by_db_name); -PG_FUNCTION_INFO_V1(citus_internal_pg_database_size_by_db_oid); -PG_FUNCTION_INFO_V1(citus_internal_database_size); static bool got_SIGTERM = false; @@ -4056,155 +4049,6 @@ citus_internal_database_command(PG_FUNCTION_ARGS) } -Datum -citus_internal_pg_database_size_by_db_name(PG_FUNCTION_ARGS) -{ - CheckCitusVersion(ERROR); - - PG_ENSURE_ARGNOTNULL(0, "dbName"); - - Name dbName = PG_GETARG_NAME(0); - Datum size = DirectFunctionCall1(pg_database_size_name, NameGetDatum(dbName)); - - PG_RETURN_DATUM(size); -} - - -Datum -citus_internal_pg_database_size_by_db_oid(PG_FUNCTION_ARGS) -{ - CheckCitusVersion(ERROR); - - PG_ENSURE_ARGNOTNULL(0, "dbOid"); - - Oid dbOid = PG_GETARG_OID(0); - Datum size = DirectFunctionCall1(pg_database_size_oid, ObjectIdGetDatum(dbOid)); - - PG_RETURN_DATUM(size); -} - - -static int -GroupLookupFromDatabase(int64 databaseOid, bool missingOk) -{ - ScanKeyData scanKey[1]; - int scanKeyCount = 1; - Form_pg_dist_database databaseForm = NULL; - Relation pgDistDatabase = table_open(PgDistDatabaseRelationId(), AccessShareLock); - int groupId = -1; - - ScanKeyInit(&scanKey[0], Anum_pg_dist_database_databaseid, - BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(databaseOid)); - - SysScanDesc scanDescriptor = systable_beginscan(pgDistDatabase, - InvalidOid, true, - NULL, scanKeyCount, scanKey); - - HeapTuple heapTuple = systable_getnext(scanDescriptor); - if (!HeapTupleIsValid(heapTuple) && !missingOk) - { - ereport(ERROR, (errmsg("could not find valid entry for database " - UINT64_FORMAT, databaseOid))); - } - - if (!HeapTupleIsValid(heapTuple)) - { - groupId = -2; - } - else - { - databaseForm = (Form_pg_dist_database) GETSTRUCT(heapTuple); - groupId = databaseForm->groupid; - } - - systable_endscan(scanDescriptor); - table_close(pgDistDatabase, NoLock); - - return groupId; -} - - -Datum -citus_internal_database_size(PG_FUNCTION_ARGS) -{ - uint32 connectionFlag = 0; - - - PGresult *result = NULL; - bool failOnError = true; - - CheckCitusVersion(ERROR); - - PG_ENSURE_ARGNOTNULL(0, "dbName"); - - Name dbName = PG_GETARG_NAME(0); - elog(INFO, "citus_internal_database_name: %s", dbName->data); - - StringInfo databaseSizeQuery = makeStringInfo(); - appendStringInfo(databaseSizeQuery, - "SELECT citus_internal.pg_database_size_local('%s')", - dbName->data); - - /*get database oid */ - bool missingOk = true; - Oid databaseOid = get_database_oid(dbName->data, missingOk); - elog(INFO, "citus_internal_database_oid: %d", databaseOid); - - /*get group id */ - int groupId = GroupLookupFromDatabase(databaseOid, missingOk); - if (groupId < 0) - { - ereport(ERROR, (errmsg("could not find valid entry for database %d ", - databaseOid))); - PG_RETURN_INT64(-1); - } - elog(INFO, "group id: %d", groupId); - - WorkerNode *workerNode = LookupNodeForGroup(groupId); - - char *workerNodeName = workerNode->workerName; - uint32 workerNodePort = workerNode->workerPort; - - elog(INFO, "workerNodeName: %s", workerNodeName); - elog(INFO, "workerNodePort: %d", workerNodePort); - - if (groupId == GetLocalGroupId()) - { - /*local database */ - elog(INFO, "local database"); - PG_RETURN_INT64(DirectFunctionCall1(citus_internal_pg_database_size_by_db_name, - NameGetDatum(dbName))); - } - else - { - elog(INFO, "remote database"); - /*remote database */ - MultiConnection *connection = GetNodeConnection(connectionFlag, workerNodeName, - workerNodePort); - int queryResult = ExecuteOptionalRemoteCommand(connection, - databaseSizeQuery->data, - &result); - int64 size = 0; - - if (queryResult != 0) - { - ereport(WARNING, (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("could not connect to %s:%d to get size of " - "database \"%s\"", - workerNodeName, workerNodePort, - dbName->data))); - } - else - { - size = ParseIntField(result, 0, 0); - } - PQclear(result); - ClearResults(connection, failOnError); - PG_RETURN_INT64(size); - } -} - - /* * SyncNewColocationGroup synchronizes a new pg_dist_colocation entry to a worker. */ diff --git a/src/include/distributed/pg_dist_database.h b/src/include/distributed/pg_dist_database.h index e935014e5..12983b94e 100644 --- a/src/include/distributed/pg_dist_database.h +++ b/src/include/distributed/pg_dist_database.h @@ -11,6 +11,8 @@ #ifndef CITUS_PG_DIST_DATABASE_H #define CITUS_PG_DIST_DATABASE_H +#include "catalog/pg_namespace_d.h" +#include "utils/lsyscache.h" typedef struct FormData_pg_dist_database { @@ -27,7 +29,8 @@ typedef struct FormData_pg_dist_database * ---------------- */ typedef FormData_pg_dist_database *Form_pg_dist_database; -#endif /* CITUS_PG_DIST_DATABASE_H */ + #define PgDistDatabaseRelationId() (get_relname_relid("pg_dist_database", \ PG_CATALOG_NAMESPACE)) +#endif /* CITUS_PG_DIST_DATABASE_H */