mirror of https://github.com/citusdata/citus.git
Reorganizes the code
parent
62e1d37623
commit
f43793ed2f
|
@ -0,0 +1,170 @@
|
|||
#include "postgres.h"
|
||||
|
||||
|
||||
#include "fmgr.h"
|
||||
#include "utils/fmgroids.h"
|
||||
#include "utils/fmgrprotos.h"
|
||||
|
||||
|
||||
#include "access/genam.h"
|
||||
#include "commands/dbcommands.h"
|
||||
#include "distributed/argutils.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/lock_graph.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/pg_dist_database.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
|
||||
|
||||
static int GroupLookupFromDatabase(int64 databaseOid, bool missingOk);
|
||||
PG_FUNCTION_INFO_V1(citus_internal_pg_database_size_by_db_name);
|
||||
PG_FUNCTION_INFO_V1(citus_internal_pg_database_size_by_db_oid);
|
||||
PG_FUNCTION_INFO_V1(citus_internal_database_size);
|
||||
|
||||
Datum
|
||||
citus_internal_pg_database_size_by_db_name(PG_FUNCTION_ARGS)
|
||||
{
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
PG_ENSURE_ARGNOTNULL(0, "dbName");
|
||||
|
||||
Name dbName = PG_GETARG_NAME(0);
|
||||
Datum size = DirectFunctionCall1(pg_database_size_name, NameGetDatum(dbName));
|
||||
|
||||
PG_RETURN_DATUM(size);
|
||||
}
|
||||
|
||||
|
||||
Datum
|
||||
citus_internal_pg_database_size_by_db_oid(PG_FUNCTION_ARGS)
|
||||
{
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
PG_ENSURE_ARGNOTNULL(0, "dbOid");
|
||||
|
||||
Oid dbOid = PG_GETARG_OID(0);
|
||||
Datum size = DirectFunctionCall1(pg_database_size_oid, ObjectIdGetDatum(dbOid));
|
||||
|
||||
PG_RETURN_DATUM(size);
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
GroupLookupFromDatabase(int64 databaseOid, bool missingOk)
|
||||
{
|
||||
ScanKeyData scanKey[1];
|
||||
int scanKeyCount = 1;
|
||||
Form_pg_dist_database databaseForm = NULL;
|
||||
Relation pgDistDatabase = table_open(PgDistDatabaseRelationId(), AccessShareLock);
|
||||
int groupId = -1;
|
||||
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_database_databaseid,
|
||||
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(databaseOid));
|
||||
|
||||
SysScanDesc scanDescriptor = systable_beginscan(pgDistDatabase,
|
||||
InvalidOid, true,
|
||||
NULL, scanKeyCount, scanKey);
|
||||
|
||||
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
||||
if (!HeapTupleIsValid(heapTuple) && !missingOk)
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not find valid entry for database "
|
||||
UINT64_FORMAT, databaseOid)));
|
||||
}
|
||||
|
||||
if (!HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
groupId = -2;
|
||||
}
|
||||
else
|
||||
{
|
||||
databaseForm = (Form_pg_dist_database) GETSTRUCT(heapTuple);
|
||||
groupId = databaseForm->groupid;
|
||||
}
|
||||
|
||||
systable_endscan(scanDescriptor);
|
||||
table_close(pgDistDatabase, NoLock);
|
||||
|
||||
return groupId;
|
||||
}
|
||||
|
||||
|
||||
Datum
|
||||
citus_internal_database_size(PG_FUNCTION_ARGS)
|
||||
{
|
||||
uint32 connectionFlag = 0;
|
||||
|
||||
|
||||
PGresult *result = NULL;
|
||||
bool failOnError = true;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
PG_ENSURE_ARGNOTNULL(0, "dbName");
|
||||
|
||||
Name dbName = PG_GETARG_NAME(0);
|
||||
elog(INFO, "citus_internal_database_name: %s", dbName->data);
|
||||
|
||||
StringInfo databaseSizeQuery = makeStringInfo();
|
||||
appendStringInfo(databaseSizeQuery,
|
||||
"SELECT citus_internal.pg_database_size_local('%s')",
|
||||
dbName->data);
|
||||
|
||||
/*get database oid */
|
||||
bool missingOk = true;
|
||||
Oid databaseOid = get_database_oid(dbName->data, missingOk);
|
||||
elog(INFO, "citus_internal_database_oid: %d", databaseOid);
|
||||
|
||||
/*get group id */
|
||||
int groupId = GroupLookupFromDatabase(databaseOid, missingOk);
|
||||
if (groupId < 0)
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not find valid entry for database %d ",
|
||||
databaseOid)));
|
||||
PG_RETURN_INT64(-1);
|
||||
}
|
||||
elog(INFO, "group id: %d", groupId);
|
||||
|
||||
WorkerNode *workerNode = LookupNodeForGroup(groupId);
|
||||
|
||||
char *workerNodeName = workerNode->workerName;
|
||||
uint32 workerNodePort = workerNode->workerPort;
|
||||
|
||||
elog(INFO, "workerNodeName: %s", workerNodeName);
|
||||
elog(INFO, "workerNodePort: %d", workerNodePort);
|
||||
|
||||
if (groupId == GetLocalGroupId())
|
||||
{
|
||||
/*local database */
|
||||
elog(INFO, "local database");
|
||||
PG_RETURN_INT64(DirectFunctionCall1(citus_internal_pg_database_size_by_db_name,
|
||||
NameGetDatum(dbName)));
|
||||
}
|
||||
else
|
||||
{
|
||||
elog(INFO, "remote database");
|
||||
/*remote database */
|
||||
MultiConnection *connection = GetNodeConnection(connectionFlag, workerNodeName,
|
||||
workerNodePort);
|
||||
int queryResult = ExecuteOptionalRemoteCommand(connection,
|
||||
databaseSizeQuery->data,
|
||||
&result);
|
||||
int64 size = 0;
|
||||
|
||||
if (queryResult != 0)
|
||||
{
|
||||
ereport(WARNING, (errcode(ERRCODE_CONNECTION_FAILURE),
|
||||
errmsg("could not connect to %s:%d to get size of "
|
||||
"database \"%s\"",
|
||||
workerNodeName, workerNodePort,
|
||||
dbName->data)));
|
||||
}
|
||||
else
|
||||
{
|
||||
size = ParseIntField(result, 0, 0);
|
||||
}
|
||||
PQclear(result);
|
||||
ClearResults(connection, failOnError);
|
||||
PG_RETURN_INT64(size);
|
||||
}
|
||||
}
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "fmgr.h"
|
||||
#include "miscadmin.h"
|
||||
#include "pgstat.h"
|
||||
|
||||
|
@ -67,7 +66,6 @@
|
|||
#include "distributed/deparser.h"
|
||||
#include "distributed/distribution_column.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/lock_graph.h"
|
||||
#include "distributed/maintenanced.h"
|
||||
#include "distributed/metadata/dependency.h"
|
||||
#include "distributed/metadata/distobject.h"
|
||||
|
@ -80,7 +78,6 @@
|
|||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/pg_dist_colocation.h"
|
||||
#include "distributed/pg_dist_database.h"
|
||||
#include "distributed/pg_dist_node.h"
|
||||
#include "distributed/pg_dist_schema.h"
|
||||
#include "distributed/pg_dist_shard.h"
|
||||
|
@ -158,7 +155,6 @@ static char * RemoteSchemaIdExpressionByName(char *schemaName);
|
|||
static char * RemoteTypeIdExpression(Oid typeId);
|
||||
static char * RemoteCollationIdExpression(Oid colocationId);
|
||||
static char * RemoteTableIdExpression(Oid relationId);
|
||||
static int GroupLookupFromDatabase(int64 databaseOid, bool missingOk);
|
||||
|
||||
|
||||
PG_FUNCTION_INFO_V1(start_metadata_sync_to_all_nodes);
|
||||
|
@ -188,9 +184,6 @@ PG_FUNCTION_INFO_V1(citus_internal_add_tenant_schema);
|
|||
PG_FUNCTION_INFO_V1(citus_internal_delete_tenant_schema);
|
||||
PG_FUNCTION_INFO_V1(citus_internal_update_none_dist_table_metadata);
|
||||
PG_FUNCTION_INFO_V1(citus_internal_database_command);
|
||||
PG_FUNCTION_INFO_V1(citus_internal_pg_database_size_by_db_name);
|
||||
PG_FUNCTION_INFO_V1(citus_internal_pg_database_size_by_db_oid);
|
||||
PG_FUNCTION_INFO_V1(citus_internal_database_size);
|
||||
|
||||
|
||||
static bool got_SIGTERM = false;
|
||||
|
@ -4056,155 +4049,6 @@ citus_internal_database_command(PG_FUNCTION_ARGS)
|
|||
}
|
||||
|
||||
|
||||
Datum
|
||||
citus_internal_pg_database_size_by_db_name(PG_FUNCTION_ARGS)
|
||||
{
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
PG_ENSURE_ARGNOTNULL(0, "dbName");
|
||||
|
||||
Name dbName = PG_GETARG_NAME(0);
|
||||
Datum size = DirectFunctionCall1(pg_database_size_name, NameGetDatum(dbName));
|
||||
|
||||
PG_RETURN_DATUM(size);
|
||||
}
|
||||
|
||||
|
||||
Datum
|
||||
citus_internal_pg_database_size_by_db_oid(PG_FUNCTION_ARGS)
|
||||
{
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
PG_ENSURE_ARGNOTNULL(0, "dbOid");
|
||||
|
||||
Oid dbOid = PG_GETARG_OID(0);
|
||||
Datum size = DirectFunctionCall1(pg_database_size_oid, ObjectIdGetDatum(dbOid));
|
||||
|
||||
PG_RETURN_DATUM(size);
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
GroupLookupFromDatabase(int64 databaseOid, bool missingOk)
|
||||
{
|
||||
ScanKeyData scanKey[1];
|
||||
int scanKeyCount = 1;
|
||||
Form_pg_dist_database databaseForm = NULL;
|
||||
Relation pgDistDatabase = table_open(PgDistDatabaseRelationId(), AccessShareLock);
|
||||
int groupId = -1;
|
||||
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_database_databaseid,
|
||||
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(databaseOid));
|
||||
|
||||
SysScanDesc scanDescriptor = systable_beginscan(pgDistDatabase,
|
||||
InvalidOid, true,
|
||||
NULL, scanKeyCount, scanKey);
|
||||
|
||||
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
||||
if (!HeapTupleIsValid(heapTuple) && !missingOk)
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not find valid entry for database "
|
||||
UINT64_FORMAT, databaseOid)));
|
||||
}
|
||||
|
||||
if (!HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
groupId = -2;
|
||||
}
|
||||
else
|
||||
{
|
||||
databaseForm = (Form_pg_dist_database) GETSTRUCT(heapTuple);
|
||||
groupId = databaseForm->groupid;
|
||||
}
|
||||
|
||||
systable_endscan(scanDescriptor);
|
||||
table_close(pgDistDatabase, NoLock);
|
||||
|
||||
return groupId;
|
||||
}
|
||||
|
||||
|
||||
Datum
|
||||
citus_internal_database_size(PG_FUNCTION_ARGS)
|
||||
{
|
||||
uint32 connectionFlag = 0;
|
||||
|
||||
|
||||
PGresult *result = NULL;
|
||||
bool failOnError = true;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
PG_ENSURE_ARGNOTNULL(0, "dbName");
|
||||
|
||||
Name dbName = PG_GETARG_NAME(0);
|
||||
elog(INFO, "citus_internal_database_name: %s", dbName->data);
|
||||
|
||||
StringInfo databaseSizeQuery = makeStringInfo();
|
||||
appendStringInfo(databaseSizeQuery,
|
||||
"SELECT citus_internal.pg_database_size_local('%s')",
|
||||
dbName->data);
|
||||
|
||||
/*get database oid */
|
||||
bool missingOk = true;
|
||||
Oid databaseOid = get_database_oid(dbName->data, missingOk);
|
||||
elog(INFO, "citus_internal_database_oid: %d", databaseOid);
|
||||
|
||||
/*get group id */
|
||||
int groupId = GroupLookupFromDatabase(databaseOid, missingOk);
|
||||
if (groupId < 0)
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not find valid entry for database %d ",
|
||||
databaseOid)));
|
||||
PG_RETURN_INT64(-1);
|
||||
}
|
||||
elog(INFO, "group id: %d", groupId);
|
||||
|
||||
WorkerNode *workerNode = LookupNodeForGroup(groupId);
|
||||
|
||||
char *workerNodeName = workerNode->workerName;
|
||||
uint32 workerNodePort = workerNode->workerPort;
|
||||
|
||||
elog(INFO, "workerNodeName: %s", workerNodeName);
|
||||
elog(INFO, "workerNodePort: %d", workerNodePort);
|
||||
|
||||
if (groupId == GetLocalGroupId())
|
||||
{
|
||||
/*local database */
|
||||
elog(INFO, "local database");
|
||||
PG_RETURN_INT64(DirectFunctionCall1(citus_internal_pg_database_size_by_db_name,
|
||||
NameGetDatum(dbName)));
|
||||
}
|
||||
else
|
||||
{
|
||||
elog(INFO, "remote database");
|
||||
/*remote database */
|
||||
MultiConnection *connection = GetNodeConnection(connectionFlag, workerNodeName,
|
||||
workerNodePort);
|
||||
int queryResult = ExecuteOptionalRemoteCommand(connection,
|
||||
databaseSizeQuery->data,
|
||||
&result);
|
||||
int64 size = 0;
|
||||
|
||||
if (queryResult != 0)
|
||||
{
|
||||
ereport(WARNING, (errcode(ERRCODE_CONNECTION_FAILURE),
|
||||
errmsg("could not connect to %s:%d to get size of "
|
||||
"database \"%s\"",
|
||||
workerNodeName, workerNodePort,
|
||||
dbName->data)));
|
||||
}
|
||||
else
|
||||
{
|
||||
size = ParseIntField(result, 0, 0);
|
||||
}
|
||||
PQclear(result);
|
||||
ClearResults(connection, failOnError);
|
||||
PG_RETURN_INT64(size);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SyncNewColocationGroup synchronizes a new pg_dist_colocation entry to a worker.
|
||||
*/
|
||||
|
|
|
@ -11,6 +11,8 @@
|
|||
#ifndef CITUS_PG_DIST_DATABASE_H
|
||||
#define CITUS_PG_DIST_DATABASE_H
|
||||
|
||||
#include "catalog/pg_namespace_d.h"
|
||||
#include "utils/lsyscache.h"
|
||||
|
||||
typedef struct FormData_pg_dist_database
|
||||
{
|
||||
|
@ -27,7 +29,8 @@ typedef struct FormData_pg_dist_database
|
|||
* ----------------
|
||||
*/
|
||||
typedef FormData_pg_dist_database *Form_pg_dist_database;
|
||||
#endif /* CITUS_PG_DIST_DATABASE_H */
|
||||
|
||||
|
||||
#define PgDistDatabaseRelationId() (get_relname_relid("pg_dist_database", \
|
||||
PG_CATALOG_NAMESPACE))
|
||||
#endif /* CITUS_PG_DIST_DATABASE_H */
|
||||
|
|
Loading…
Reference in New Issue