Fixes pipeline test errors

pull/7375/head
gindibay 2023-12-13 00:57:29 +03:00 committed by gurkanindibay
parent ee27508235
commit f8a332952d
5 changed files with 64 additions and 44 deletions

View File

@ -21,6 +21,12 @@ PG_FUNCTION_INFO_V1(citus_internal_pg_database_size_by_db_name);
PG_FUNCTION_INFO_V1(citus_internal_pg_database_size_by_db_oid);
PG_FUNCTION_INFO_V1(citus_internal_database_size);
/*
* This function obtains the size of a database given its name,
* similar to the function pg_database_size(name).
* However, since we need to override pg_database_size,
* we create this wrapper function to achieve the same functionality.
*/
Datum
citus_internal_pg_database_size_by_db_name(PG_FUNCTION_ARGS)
{
@ -35,6 +41,12 @@ citus_internal_pg_database_size_by_db_name(PG_FUNCTION_ARGS)
}
/*
* This function obtains the size of a database given its oid,
* similar to the function pg_database_size(oid).
* However, since we need to override pg_database_size,
* we create this wrapper function to achieve the same functionality.
*/
Datum
citus_internal_pg_database_size_by_db_oid(PG_FUNCTION_ARGS)
{
@ -49,46 +61,6 @@ citus_internal_pg_database_size_by_db_oid(PG_FUNCTION_ARGS)
}
static int
GroupLookupFromDatabase(int64 databaseOid, bool missingOk)
{
ScanKeyData scanKey[1];
int scanKeyCount = 1;
Form_pg_dist_database databaseForm = NULL;
Relation pgDistDatabase = table_open(PgDistDatabaseRelationId(), AccessShareLock);
int groupId = -1;
ScanKeyInit(&scanKey[0], Anum_pg_dist_database_databaseid,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(databaseOid));
SysScanDesc scanDescriptor = systable_beginscan(pgDistDatabase,
InvalidOid, true,
NULL, scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple) && !missingOk)
{
ereport(ERROR, (errmsg("could not find valid entry for database "
UINT64_FORMAT, databaseOid)));
}
if (!HeapTupleIsValid(heapTuple))
{
groupId = -2;
}
else
{
databaseForm = (Form_pg_dist_database) GETSTRUCT(heapTuple);
groupId = databaseForm->groupid;
}
systable_endscan(scanDescriptor);
table_close(pgDistDatabase, NoLock);
return groupId;
}
Datum
citus_internal_database_size(PG_FUNCTION_ARGS)
{
@ -143,6 +115,7 @@ citus_internal_database_size(PG_FUNCTION_ARGS)
else
{
elog(INFO, "remote database");
/*remote database */
MultiConnection *connection = GetNodeConnection(connectionFlag, workerNodeName,
workerNodePort);
@ -167,4 +140,47 @@ citus_internal_database_size(PG_FUNCTION_ARGS)
ClearResults(connection, failOnError);
PG_RETURN_INT64(size);
}
/*
* Retrieves the groupId of a distributed database
* using databaseOid from the pg_dist_database table.
*/
static int
GroupLookupFromDatabase(int64 databaseOid, bool missingOk)
{
ScanKeyData scanKey[1];
int scanKeyCount = 1;
Form_pg_dist_database databaseForm = NULL;
Relation pgDistDatabase = table_open(PgDistDatabaseRelationId(), AccessShareLock);
int groupId = -1;
ScanKeyInit(&scanKey[0], Anum_pg_dist_database_databaseid,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(databaseOid));
SysScanDesc scanDescriptor = systable_beginscan(pgDistDatabase,
InvalidOid, true,
NULL, scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple) && !missingOk)
{
ereport(ERROR, (errmsg("could not find valid entry for database "
UINT64_FORMAT, databaseOid)));
}
if (!HeapTupleIsValid(heapTuple))
{
groupId = -2;
}
else
{
databaseForm = (Form_pg_dist_database) GETSTRUCT(heapTuple);
groupId = databaseForm->groupid;
}
systable_endscan(scanDescriptor);
table_close(pgDistDatabase, NoLock);
return groupId;
}
}

View File

@ -6,7 +6,7 @@ AS 'MODULE_PATHNAME', $$citus_internal_pg_database_size_by_db_name$$;
COMMENT ON FUNCTION citus_internal.pg_database_size_local(name) IS
'calculates the size of a database in bytes by its name in a multi-node cluster';
CREATE OR REPLACE FUNCTION citus_internal.pg_database_size_local(db_oid oid)
CREATE OR REPLACE FUNCTION citus_internal.pg_database_size_local(db_oid oid)
RETURNS bigint
LANGUAGE C
VOLATILE

View File

@ -1,4 +1,4 @@
CREATE OR REPLACE FUNCTION pg_catalog.pg_dist_database_size(db_name name)
CREATE OR REPLACE FUNCTION pg_catalog.pg_dist_database_size(db_name name)
RETURNS bigint
LANGUAGE C
VOLATILE

View File

@ -1,4 +1,4 @@
CREATE OR REPLACE FUNCTION pg_catalog.pg_dist_database_size(db_name name)
CREATE OR REPLACE FUNCTION pg_catalog.pg_dist_database_size(db_name name)
RETURNS bigint
LANGUAGE C
VOLATILE

View File

@ -1422,8 +1422,12 @@ ALTER EXTENSION citus UPDATE TO '12.2-1';
SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object
---------------------------------------------------------------------
| function citus_internal.pg_database_size_local(name) bigint
| function citus_internal.pg_database_size_local(oid) bigint
| function citus_internal_database_command(text) void
(1 row)
| function pg_dist_database_size(name) bigint
| table pg_dist_database
(5 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version