basic infrastructure for running jobs in a background worker from the maintenance daemon

background-job-details
Nils Dijk 2022-07-07 17:31:35 +02:00 committed by Jelte Fennema
parent 28b04dc9f4
commit a7428c4ce6
10 changed files with 565 additions and 1 deletions

View File

@ -141,6 +141,11 @@ typedef struct MetadataCacheData
bool extensionLoaded; bool extensionLoaded;
Oid distShardRelationId; Oid distShardRelationId;
Oid distPlacementRelationId; Oid distPlacementRelationId;
Oid distRebalanceJobsRelationId;
Oid distRebalanceJobsJobsIndexId;
Oid distRebalanceJobsStatusJobsIndexId;
Oid jobStatusScheduledId;
Oid jobStatusDoneId;
Oid distRebalanceStrategyRelationId; Oid distRebalanceStrategyRelationId;
Oid distNodeRelationId; Oid distNodeRelationId;
Oid distNodeNodeIdIndexId; Oid distNodeNodeIdIndexId;
@ -2364,6 +2369,36 @@ DistLocalGroupIdRelationId(void)
} }
Oid
DistRebalanceJobsRelationId(void)
{
CachedRelationLookup("pg_dist_rebalance_jobs",
&MetadataCache.distRebalanceJobsRelationId);
return MetadataCache.distRebalanceJobsRelationId;
}
Oid
DistRebalanceJobsJobsIdIndexId(void)
{
CachedRelationLookup("pg_dist_rebalance_jobs_jobid_index",
&MetadataCache.distRebalanceJobsJobsIndexId);
return MetadataCache.distRebalanceJobsJobsIndexId;
}
Oid
DistRebalanceJobsStatusJobsIdIndexId(void)
{
CachedRelationLookup("pg_dist_rebalance_jobs_status_jobid_index",
&MetadataCache.distRebalanceJobsStatusJobsIndexId);
return MetadataCache.distRebalanceJobsStatusJobsIndexId;
}
/* return oid of pg_dist_rebalance_strategy relation */ /* return oid of pg_dist_rebalance_strategy relation */
Oid Oid
DistRebalanceStrategyRelationId(void) DistRebalanceStrategyRelationId(void)
@ -3086,6 +3121,32 @@ SecondaryNodeRoleId(void)
} }
Oid
JobStatusScheduledId(void)
{
if (!MetadataCache.jobStatusScheduledId)
{
MetadataCache.jobStatusScheduledId =
LookupStringEnumValueId("citus_job_status", "scheduled");
}
return MetadataCache.jobStatusScheduledId;
}
Oid
JobStatusDoneId(void)
{
if (!MetadataCache.jobStatusDoneId)
{
MetadataCache.jobStatusDoneId =
LookupStringEnumValueId("citus_job_status", "done");
}
return MetadataCache.jobStatusDoneId;
}
/* /*
* citus_dist_partition_cache_invalidate is a trigger function that performs * citus_dist_partition_cache_invalidate is a trigger function that performs
* relcache invalidations when the contents of pg_dist_partition are changed * relcache invalidations when the contents of pg_dist_partition are changed

View File

@ -45,6 +45,7 @@
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_colocation.h"
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_rebalance_jobs.h"
#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard.h"
#include "distributed/pg_dist_placement.h" #include "distributed/pg_dist_placement.h"
#include "distributed/reference_table_utils.h" #include "distributed/reference_table_utils.h"
@ -2217,3 +2218,237 @@ IsForeignTable(Oid relationId)
{ {
return get_rel_relkind(relationId) == RELKIND_FOREIGN_TABLE; return get_rel_relkind(relationId) == RELKIND_FOREIGN_TABLE;
} }
bool
HasScheduledRebalanceJobs()
{
const int scanKeyCount = 1;
ScanKeyData scanKey[1];
bool indexOK = true;
Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(),
AccessShareLock);
/* pg_dist_rebalance_jobs.status == 'scheduled' */
ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_status,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(JobStatusScheduledId()));
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs,
DistRebalanceJobsStatusJobsIdIndexId(),
indexOK, NULL, scanKeyCount, scanKey);
HeapTuple jobTuple = systable_getnext(scanDescriptor);
bool hasScheduledJob = false;
if (HeapTupleIsValid(jobTuple))
{
hasScheduledJob = true;
}
systable_endscan(scanDescriptor);
table_close(pgDistRebalanceJobs, AccessShareLock);
return hasScheduledJob;
}
static RebalanceJobStatus
RebalanceJobStatusByOid(Oid enumOid)
{
if (enumOid == JobStatusDoneId())
{
return REBALANCE_JOB_STATUS_DONE;
}
else if (enumOid == JobStatusScheduledId())
{
return REBALANCE_JOB_STATUS_SCHEDULED;
}
ereport(ERROR, (errmsg("unknown enum value for citus_job_status")));
return REBALANCE_JOB_STATUS_UNKNOWN;
}
static Oid
RebalanceJobStatusOid(RebalanceJobStatus status)
{
switch (status)
{
case REBALANCE_JOB_STATUS_SCHEDULED:
{
return JobStatusScheduledId();
}
case REBALANCE_JOB_STATUS_DONE:
{
return JobStatusDoneId();
}
default:
{
return InvalidOid;
}
}
}
static void
ParseMoveJob(RebalanceJob *target, Datum moveArgs)
{
HeapTupleHeader t = DatumGetHeapTupleHeader(moveArgs);
bool isnull = false;
Datum shardIdDatum = GetAttributeByName(t, "shard_id", &isnull);
if (isnull)
{
ereport(ERROR, (errmsg(
"shard_id for citus_move_shard_placement_arguments "
"can't be null")));
}
uint64 shardId = DatumGetUInt64(shardIdDatum);
Datum sourceNodeNameDatum = GetAttributeByName(t, "source_node_name", &isnull);
if (isnull)
{
ereport(ERROR, (errmsg(
"source_node_name for citus_move_shard_placement_arguments "
"can't be null")));
}
text *sourceNodeNameText = DatumGetTextP(sourceNodeNameDatum);
Datum sourceNodePortDatum = GetAttributeByName(t, "source_node_port", &isnull);
if (isnull)
{
ereport(ERROR, (errmsg(
"source_node_port for citus_move_shard_placement_arguments "
"can't be null")));
}
int32 sourceNodePort = DatumGetInt32(sourceNodePortDatum);
Datum targetNodeNameDatum = GetAttributeByName(t, "target_node_name", &isnull);
if (isnull)
{
ereport(ERROR, (errmsg(
"target_node_name for citus_move_shard_placement_arguments "
"can't be null")));
}
text *targetNodeNameText = DatumGetTextP(targetNodeNameDatum);
Datum targetNodePortDatum = GetAttributeByName(t, "target_node_port", &isnull);
if (isnull)
{
ereport(ERROR, (errmsg(
"target_node_port for citus_move_shard_placement_arguments "
"can't be null")));
}
int32 targetNodePort = DatumGetInt32(targetNodePortDatum);
target->jobType = REBALANCE_JOB_TYPE_MOVE;
target->jobArguments.move.shardId = shardId;
target->jobArguments.move.sourceName = text_to_cstring(sourceNodeNameText);
target->jobArguments.move.sourcePort = sourceNodePort;
target->jobArguments.move.targetName = text_to_cstring(targetNodeNameText);
target->jobArguments.move.targetPort = targetNodePort;
}
RebalanceJob *
GetScheduledRebalanceJob(void)
{
const int scanKeyCount = 1;
ScanKeyData scanKey[1];
bool indexOK = true;
Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(),
AccessShareLock);
/* pg_dist_rebalance_jobs.status == 'scheduled' */
ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_status,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(JobStatusScheduledId()));
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs,
DistRebalanceJobsStatusJobsIdIndexId(),
indexOK, NULL, scanKeyCount, scanKey);
HeapTuple jobTuple = systable_getnext(scanDescriptor);
RebalanceJob *job = NULL;
if (HeapTupleIsValid(jobTuple))
{
Form_pg_dist_rebalance_job jobData = NULL;
jobData = (Form_pg_dist_rebalance_job) GETSTRUCT(jobTuple);
job = palloc0(sizeof(RebalanceJob));
job->jobid = jobData->jobid;
job->status = RebalanceJobStatusByOid(jobData->status);
/* TODO parse the actual job */
Datum datumArray[Natts_pg_dist_rebalance_jobs];
bool isNullArray[Natts_pg_dist_rebalance_jobs];
TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs);
heap_deform_tuple(jobTuple, tupleDescriptor, datumArray, isNullArray);
if (!isNullArray[Anum_pg_dist_rebalance_jobs_citus_move_shard_placement - 1])
{
/* citus_move_shard_placement job */
ParseMoveJob(
job,
datumArray[Anum_pg_dist_rebalance_jobs_citus_move_shard_placement - 1]);
}
else
{
ereport(ERROR, (errmsg("undefined job type")));
}
}
systable_endscan(scanDescriptor);
table_close(pgDistRebalanceJobs, AccessShareLock);
return job;
}
void
UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus)
{
Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(),
RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs);
ScanKeyData scanKey[1];
int scanKeyCount = 1;
/* WHERE jobid = job->jobid */
ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_jobid,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(job->jobid));
const bool indexOK = true;
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs,
DistRebalanceJobsJobsIdIndexId(),
indexOK,
NULL, scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find rebalance job entry for jobid: "
UINT64_FORMAT, job->jobid)));
}
Datum values[Natts_pg_dist_rebalance_jobs] = { 0 };
bool isnull[Natts_pg_dist_rebalance_jobs] = { 0 };
bool replace[Natts_pg_dist_rebalance_jobs] = { 0 };
values[Anum_pg_dist_rebalance_jobs_status - 1] =
ObjectIdGetDatum(RebalanceJobStatusOid(newStatus));
isnull[Anum_pg_dist_rebalance_jobs_status - 1] = false;
replace[Anum_pg_dist_rebalance_jobs_status - 1] = true;
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
CatalogTupleUpdate(pgDistRebalanceJobs, &heapTuple->t_self, heapTuple);
CommandCounterIncrement();
systable_endscan(scanDescriptor);
table_close(pgDistRebalanceJobs, NoLock);
}

View File

@ -1882,6 +1882,16 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD, GUC_STANDARD,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.rebalance_check_interval",
gettext_noop("Time to wait between checks for scheduled rebalance jobs."),
NULL,
&RebalanceCheckInterval,
1000, -1, 7 * 24 * 3600 * 1000,
PGC_SIGHUP,
GUC_UNIT_MS,
NULL, NULL, NULL);
DefineCustomIntVariable( DefineCustomIntVariable(
"citus.recover_2pc_interval", "citus.recover_2pc_interval",
gettext_noop("Sets the time to wait between recovering 2PCs."), gettext_noop("Sets the time to wait between recovering 2PCs."),

View File

@ -76,3 +76,26 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_
DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text); DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text);
#include "udfs/isolate_tenant_to_new_shard/11.1-1.sql" #include "udfs/isolate_tenant_to_new_shard/11.1-1.sql"
CREATE TYPE citus.citus_move_shard_placement_arguments AS (
shard_id bigint,
source_node_name text,
source_node_port integer,
target_node_name text,
target_node_port integer,
shard_transfer_mode citus.shard_transfer_mode
);
ALTER TYPE citus.citus_move_shard_placement_arguments SET SCHEMA pg_catalog;
CREATE TYPE citus.citus_job_status AS ENUM ('scheduled', 'done');
ALTER TYPE citus.citus_job_status SET SCHEMA pg_catalog;
CREATE TABLE citus.pg_dist_rebalance_jobs(
jobid bigserial NOT NULL,
status pg_catalog.citus_job_status default 'scheduled',
citus_move_shard_placement pg_catalog.citus_move_shard_placement_arguments
);
-- SELECT granted to PUBLIC in upgrade script
ALTER TABLE citus.pg_dist_rebalance_jobs SET SCHEMA pg_catalog;
CREATE UNIQUE INDEX pg_dist_rebalance_jobs_jobid_index ON pg_catalog.pg_dist_rebalance_jobs using btree(jobid);
CREATE INDEX pg_dist_rebalance_jobs_status_jobid_index ON pg_catalog.pg_dist_rebalance_jobs using btree(status, jobid);

View File

@ -93,6 +93,7 @@ typedef struct MaintenanceDaemonDBData
double DistributedDeadlockDetectionTimeoutFactor = 2.0; double DistributedDeadlockDetectionTimeoutFactor = 2.0;
int Recover2PCInterval = 60000; int Recover2PCInterval = 60000;
int DeferShardDeleteInterval = 15000; int DeferShardDeleteInterval = 15000;
int RebalanceCheckInterval = 1000;
/* config variables for metadata sync timeout */ /* config variables for metadata sync timeout */
int MetadataSyncInterval = 60000; int MetadataSyncInterval = 60000;
@ -119,7 +120,8 @@ static void MaintenanceDaemonShmemExit(int code, Datum arg);
static void MaintenanceDaemonErrorContext(void *arg); static void MaintenanceDaemonErrorContext(void *arg);
static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData); static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData);
static void WarnMaintenanceDaemonNotStarted(void); static void WarnMaintenanceDaemonNotStarted(void);
static BackgroundWorkerHandle * StartRebalanceJobsBackgroundWorker(Oid database,
Oid extensionOwner);
/* /*
* InitializeMaintenanceDaemon, called at server start, is responsible for * InitializeMaintenanceDaemon, called at server start, is responsible for
@ -283,6 +285,9 @@ CitusMaintenanceDaemonMain(Datum main_arg)
TimestampTz lastStatStatementsPurgeTime = 0; TimestampTz lastStatStatementsPurgeTime = 0;
TimestampTz nextMetadataSyncTime = 0; TimestampTz nextMetadataSyncTime = 0;
/* state kept for the reabalance check */
TimestampTz lastRebalanceCheck = 0;
BackgroundWorkerHandle *rebalanceBgwHandle = NULL;
/* /*
* We do metadata sync in a separate background worker. We need its * We do metadata sync in a separate background worker. We need its
@ -683,6 +688,65 @@ CitusMaintenanceDaemonMain(Datum main_arg)
timeout = Min(timeout, (StatStatementsPurgeInterval * 1000)); timeout = Min(timeout, (StatStatementsPurgeInterval * 1000));
} }
pid_t rebalanceWorkerPid = 0;
BgwHandleStatus rebalanceWorkerStatus =
rebalanceBgwHandle != NULL ?
GetBackgroundWorkerPid(rebalanceBgwHandle, &rebalanceWorkerPid) :
BGWH_STOPPED;
if (TimestampDifferenceExceeds(lastRebalanceCheck, GetCurrentTimestamp(),
RebalanceCheckInterval) &&
rebalanceWorkerStatus == BGWH_STOPPED)
{
/* clear old background worker for rebalancing before checking for new jobs */
if (rebalanceBgwHandle)
{
TerminateBackgroundWorker(rebalanceBgwHandle);
pfree(rebalanceBgwHandle);
rebalanceBgwHandle = NULL;
}
StartTransactionCommand();
bool shouldStartRebalanceJobsBackgroundWorker = false;
if (!LockCitusExtension())
{
ereport(DEBUG1, (errmsg("could not lock the citus extension, "
"skipping stat statements purging")));
}
else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
{
/* perform catalog precheck */
shouldStartRebalanceJobsBackgroundWorker = HasScheduledRebalanceJobs();
}
CommitTransactionCommand();
if (shouldStartRebalanceJobsBackgroundWorker)
{
/* spawn background worker */
ereport(LOG, (errmsg("found scheduled job waiting for rebalance. "
"Starting background worker for execution.")));
rebalanceBgwHandle =
StartRebalanceJobsBackgroundWorker(MyDatabaseId,
myDbData->userOid);
if (!rebalanceBgwHandle ||
GetBackgroundWorkerPid(rebalanceBgwHandle, &rebalanceWorkerPid) ==
BGWH_STOPPED)
{
ereport(WARNING, (errmsg("unable to start background worker for "
"rebalance jobs")));
/* todo cooldown timer */
}
}
/* interval management */
lastRebalanceCheck = GetCurrentTimestamp();
timeout = Min(timeout, RebalanceCheckInterval);
}
/* /*
* Wait until timeout, or until somebody wakes us up. Also cast the timeout to * Wait until timeout, or until somebody wakes us up. Also cast the timeout to
* integer where we've calculated it using double for not losing the precision. * integer where we've calculated it using double for not losing the precision.
@ -731,6 +795,100 @@ CitusMaintenanceDaemonMain(Datum main_arg)
} }
static BackgroundWorkerHandle *
StartRebalanceJobsBackgroundWorker(Oid database, Oid extensionOwner)
{
BackgroundWorker worker;
BackgroundWorkerHandle *handle = NULL;
/* Configure a worker. */
memset(&worker, 0, sizeof(worker));
SafeSnprintf(worker.bgw_name, BGW_MAXLEN,
"Citus Rebalance Jobs Worker: %u/%u",
database, extensionOwner);
worker.bgw_flags =
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_ConsistentState;
/* don't restart, we manage restarts from maintenance daemon */
worker.bgw_restart_time = BGW_NEVER_RESTART;
strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
"RebalanceJobsBackgroundWorkerMain");
worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId);
memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner,
sizeof(Oid));
worker.bgw_notify_pid = MyProcPid;
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
{
return NULL;
}
pid_t pid;
WaitForBackgroundWorkerStartup(handle, &pid);
return handle;
}
void
RebalanceJobsBackgroundWorkerMain(Datum arg)
{
Oid databaseOid = DatumGetObjectId(arg);
/* extension owner is passed via bgw_extra */
Oid extensionOwner = InvalidOid;
memcpy_s(&extensionOwner, sizeof(extensionOwner),
MyBgworkerEntry->bgw_extra, sizeof(Oid));
BackgroundWorkerUnblockSignals();
/* connect to database, after that we can actually access catalogs */
BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0);
/* make worker recognizable in pg_stat_activity */
pgstat_report_appname("rebalance jobs worker");
bool hasJobs = true;
while (hasJobs)
{
CHECK_FOR_INTERRUPTS();
InvalidateMetadataSystemCache();
StartTransactionCommand();
PushActiveSnapshot(GetTransactionSnapshot());
if (!LockCitusExtension())
{
ereport(DEBUG1, (errmsg("could not lock the citus extension, "
"skipping metadata sync")));
}
else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
{
RebalanceJob *job = GetScheduledRebalanceJob();
if (job)
{
ereport(LOG, (errmsg("found job with jobid: %ld", job->jobid)));
/* TODO execute job */
UpdateJobStatus(job, REBALANCE_JOB_STATUS_DONE);
}
else
{
hasJobs = false;
}
}
PopActiveSnapshot();
CommitTransactionCommand();
ProcessCompletedNotifies();
}
}
/* /*
* MaintenanceDaemonShmemSize computes how much shared memory is required. * MaintenanceDaemonShmemSize computes how much shared memory is required.
*/ */

View File

@ -30,5 +30,6 @@ extern void InitializeMaintenanceDaemonBackend(void);
extern bool LockCitusExtension(void); extern bool LockCitusExtension(void);
extern void CitusMaintenanceDaemonMain(Datum main_arg); extern void CitusMaintenanceDaemonMain(Datum main_arg);
extern void RebalanceJobsBackgroundWorkerMain(Datum arg);
#endif /* MAINTENANCED_H */ #endif /* MAINTENANCED_H */

View File

@ -227,6 +227,7 @@ extern Oid DistPartitionRelationId(void);
extern Oid DistShardRelationId(void); extern Oid DistShardRelationId(void);
extern Oid DistPlacementRelationId(void); extern Oid DistPlacementRelationId(void);
extern Oid DistNodeRelationId(void); extern Oid DistNodeRelationId(void);
extern Oid DistRebalanceJobsRelationId(void);
extern Oid DistRebalanceStrategyRelationId(void); extern Oid DistRebalanceStrategyRelationId(void);
extern Oid DistLocalGroupIdRelationId(void); extern Oid DistLocalGroupIdRelationId(void);
extern Oid DistObjectRelationId(void); extern Oid DistObjectRelationId(void);
@ -236,6 +237,8 @@ extern Oid DistEnabledCustomAggregatesId(void);
extern Oid DistNodeNodeIdIndexId(void); extern Oid DistNodeNodeIdIndexId(void);
extern Oid DistPartitionLogicalRelidIndexId(void); extern Oid DistPartitionLogicalRelidIndexId(void);
extern Oid DistPartitionColocationidIndexId(void); extern Oid DistPartitionColocationidIndexId(void);
extern Oid DistRebalanceJobsJobsIdIndexId(void);
extern Oid DistRebalanceJobsStatusJobsIdIndexId(void);
extern Oid DistShardLogicalRelidIndexId(void); extern Oid DistShardLogicalRelidIndexId(void);
extern Oid DistShardShardidIndexId(void); extern Oid DistShardShardidIndexId(void);
extern Oid DistPlacementShardidIndexId(void); extern Oid DistPlacementShardidIndexId(void);
@ -268,6 +271,8 @@ extern Oid SecondaryNodeRoleId(void);
extern Oid CitusCopyFormatTypeId(void); extern Oid CitusCopyFormatTypeId(void);
extern Oid TextCopyFormatId(void); extern Oid TextCopyFormatId(void);
extern Oid BinaryCopyFormatId(void); extern Oid BinaryCopyFormatId(void);
extern Oid JobStatusDoneId(void);
extern Oid JobStatusScheduledId(void);
/* user related functions */ /* user related functions */
extern Oid CitusExtensionOwner(void); extern Oid CitusExtensionOwner(void);

View File

@ -24,6 +24,7 @@
#include "distributed/citus_nodes.h" #include "distributed/citus_nodes.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/errormessage.h" #include "distributed/errormessage.h"
#include "distributed/pg_dist_rebalance_jobs.h"
#include "distributed/relay_utility.h" #include "distributed/relay_utility.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "utils/acl.h" #include "utils/acl.h"
@ -203,6 +204,38 @@ typedef enum SizeQueryType
TABLE_SIZE /* pg_table_size() */ TABLE_SIZE /* pg_table_size() */
} SizeQueryType; } SizeQueryType;
typedef enum RebalanceJobStatus
{
REBALANCE_JOB_STATUS_UNKNOWN,
REBALANCE_JOB_STATUS_SCHEDULED,
REBALANCE_JOB_STATUS_DONE
} RebalanceJobStatus;
typedef enum RebalanceJobType
{
REBALANCE_JOB_TYPE_UNKNOWN,
REBALANCE_JOB_TYPE_MOVE
} RebalanceJobType;
typedef struct RebalanceJob
{
int64 jobid;
RebalanceJobStatus status;
RebalanceJobType jobType;
union
{
struct
{
uint32 shardId;
char *sourceName;
int32 sourcePort;
char *targetName;
int32 targetPort;
} move;
} jobArguments;
} RebalanceJob;
/* Size functions */ /* Size functions */
extern Datum citus_table_size(PG_FUNCTION_ARGS); extern Datum citus_table_size(PG_FUNCTION_ARGS);
@ -311,4 +344,7 @@ extern void EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid
ownerRelationId); ownerRelationId);
extern void AlterSequenceType(Oid seqOid, Oid typeOid); extern void AlterSequenceType(Oid seqOid, Oid typeOid);
extern void EnsureRelationHasCompatibleSequenceTypes(Oid relationId); extern void EnsureRelationHasCompatibleSequenceTypes(Oid relationId);
extern bool HasScheduledRebalanceJobs(void);
extern RebalanceJob * GetScheduledRebalanceJob(void);
extern void UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus);
#endif /* METADATA_UTILITY_H */ #endif /* METADATA_UTILITY_H */

View File

@ -0,0 +1,34 @@
#ifndef CITUS_PG_DIST_REBALANCE_JOBS_H
#define CITUS_PG_DIST_REBALANCE_JOBS_H
/* ----------------
* pg_dist_rebalance_job definition.
* ----------------
*/
typedef struct FormData_pg_dist_rebalance_job
{
int64 jobid;
Oid status;
#ifdef CATALOG_VARLEN /* variable-length fields start here */
text citus_move_shard_placement; /* text? we need to understand how to read a variable length stored custon type */
#endif
} FormData_pg_dist_rebalance_job;
/* ----------------
* Form_pg_dist_colocation corresponds to a pointer to a tuple with
* the format of pg_dist_colocation relation.
* ----------------
*/
typedef FormData_pg_dist_rebalance_job *Form_pg_dist_rebalance_job;
/* ----------------
* compiler constants for pg_dist_rebalance_jobs
* ----------------
*/
#define Natts_pg_dist_rebalance_jobs 3
#define Anum_pg_dist_rebalance_jobs_jobid 1
#define Anum_pg_dist_rebalance_jobs_status 2
#define Anum_pg_dist_rebalance_jobs_citus_move_shard_placement 3
#endif /* CITUS_PG_DIST_REBALANCE_JOBS_H */

View File

@ -13,6 +13,7 @@
/* GUC to configure deferred shard deletion */ /* GUC to configure deferred shard deletion */
extern int DeferShardDeleteInterval; extern int DeferShardDeleteInterval;
extern int RebalanceCheckInterval;
extern bool DeferShardDeleteOnMove; extern bool DeferShardDeleteOnMove;
extern double DesiredPercentFreeAfterMove; extern double DesiredPercentFreeAfterMove;
extern bool CheckAvailableSpaceBeforeMove; extern bool CheckAvailableSpaceBeforeMove;