diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 810749f75..811d76c23 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -141,6 +141,11 @@ typedef struct MetadataCacheData bool extensionLoaded; Oid distShardRelationId; Oid distPlacementRelationId; + Oid distRebalanceJobsRelationId; + Oid distRebalanceJobsJobsIndexId; + Oid distRebalanceJobsStatusJobsIndexId; + Oid jobStatusScheduledId; + Oid jobStatusDoneId; Oid distRebalanceStrategyRelationId; Oid distNodeRelationId; Oid distNodeNodeIdIndexId; @@ -2364,6 +2369,36 @@ DistLocalGroupIdRelationId(void) } +Oid +DistRebalanceJobsRelationId(void) +{ + CachedRelationLookup("pg_dist_rebalance_jobs", + &MetadataCache.distRebalanceJobsRelationId); + + return MetadataCache.distRebalanceJobsRelationId; +} + + +Oid +DistRebalanceJobsJobsIdIndexId(void) +{ + CachedRelationLookup("pg_dist_rebalance_jobs_jobid_index", + &MetadataCache.distRebalanceJobsJobsIndexId); + + return MetadataCache.distRebalanceJobsJobsIndexId; +} + + +Oid +DistRebalanceJobsStatusJobsIdIndexId(void) +{ + CachedRelationLookup("pg_dist_rebalance_jobs_status_jobid_index", + &MetadataCache.distRebalanceJobsStatusJobsIndexId); + + return MetadataCache.distRebalanceJobsStatusJobsIndexId; +} + + /* return oid of pg_dist_rebalance_strategy relation */ Oid DistRebalanceStrategyRelationId(void) @@ -3086,6 +3121,32 @@ SecondaryNodeRoleId(void) } +Oid +JobStatusScheduledId(void) +{ + if (!MetadataCache.jobStatusScheduledId) + { + MetadataCache.jobStatusScheduledId = + LookupStringEnumValueId("citus_job_status", "scheduled"); + } + + return MetadataCache.jobStatusScheduledId; +} + + +Oid +JobStatusDoneId(void) +{ + if (!MetadataCache.jobStatusDoneId) + { + MetadataCache.jobStatusDoneId = + LookupStringEnumValueId("citus_job_status", "done"); + } + + return MetadataCache.jobStatusDoneId; +} + + /* * citus_dist_partition_cache_invalidate is a trigger function that performs * relcache invalidations when the contents of pg_dist_partition are changed diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index e65e211c6..00be74921 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -45,6 +45,7 @@ #include "distributed/multi_physical_planner.h" #include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" +#include "distributed/pg_dist_rebalance_jobs.h" #include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_placement.h" #include "distributed/reference_table_utils.h" @@ -2217,3 +2218,237 @@ IsForeignTable(Oid relationId) { return get_rel_relkind(relationId) == RELKIND_FOREIGN_TABLE; } + + +bool +HasScheduledRebalanceJobs() +{ + const int scanKeyCount = 1; + ScanKeyData scanKey[1]; + bool indexOK = true; + + Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), + AccessShareLock); + + /* pg_dist_rebalance_jobs.status == 'scheduled' */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_status, + BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(JobStatusScheduledId())); + + SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, + DistRebalanceJobsStatusJobsIdIndexId(), + indexOK, NULL, scanKeyCount, scanKey); + + HeapTuple jobTuple = systable_getnext(scanDescriptor); + + bool hasScheduledJob = false; + if (HeapTupleIsValid(jobTuple)) + { + hasScheduledJob = true; + } + + systable_endscan(scanDescriptor); + table_close(pgDistRebalanceJobs, AccessShareLock); + + return hasScheduledJob; +} + + +static RebalanceJobStatus +RebalanceJobStatusByOid(Oid enumOid) +{ + if (enumOid == JobStatusDoneId()) + { + return REBALANCE_JOB_STATUS_DONE; + } + else if (enumOid == JobStatusScheduledId()) + { + return REBALANCE_JOB_STATUS_SCHEDULED; + } + ereport(ERROR, (errmsg("unknown enum value for citus_job_status"))); + return REBALANCE_JOB_STATUS_UNKNOWN; +} + + +static Oid +RebalanceJobStatusOid(RebalanceJobStatus status) +{ + switch (status) + { + case REBALANCE_JOB_STATUS_SCHEDULED: + { + return JobStatusScheduledId(); + } + + case REBALANCE_JOB_STATUS_DONE: + { + return JobStatusDoneId(); + } + + default: + { + return InvalidOid; + } + } +} + + +static void +ParseMoveJob(RebalanceJob *target, Datum moveArgs) +{ + HeapTupleHeader t = DatumGetHeapTupleHeader(moveArgs); + bool isnull = false; + + Datum shardIdDatum = GetAttributeByName(t, "shard_id", &isnull); + if (isnull) + { + ereport(ERROR, (errmsg( + "shard_id for citus_move_shard_placement_arguments " + "can't be null"))); + } + uint64 shardId = DatumGetUInt64(shardIdDatum); + + Datum sourceNodeNameDatum = GetAttributeByName(t, "source_node_name", &isnull); + if (isnull) + { + ereport(ERROR, (errmsg( + "source_node_name for citus_move_shard_placement_arguments " + "can't be null"))); + } + text *sourceNodeNameText = DatumGetTextP(sourceNodeNameDatum); + + Datum sourceNodePortDatum = GetAttributeByName(t, "source_node_port", &isnull); + if (isnull) + { + ereport(ERROR, (errmsg( + "source_node_port for citus_move_shard_placement_arguments " + "can't be null"))); + } + int32 sourceNodePort = DatumGetInt32(sourceNodePortDatum); + + Datum targetNodeNameDatum = GetAttributeByName(t, "target_node_name", &isnull); + if (isnull) + { + ereport(ERROR, (errmsg( + "target_node_name for citus_move_shard_placement_arguments " + "can't be null"))); + } + text *targetNodeNameText = DatumGetTextP(targetNodeNameDatum); + + Datum targetNodePortDatum = GetAttributeByName(t, "target_node_port", &isnull); + if (isnull) + { + ereport(ERROR, (errmsg( + "target_node_port for citus_move_shard_placement_arguments " + "can't be null"))); + } + int32 targetNodePort = DatumGetInt32(targetNodePortDatum); + + target->jobType = REBALANCE_JOB_TYPE_MOVE; + target->jobArguments.move.shardId = shardId; + target->jobArguments.move.sourceName = text_to_cstring(sourceNodeNameText); + target->jobArguments.move.sourcePort = sourceNodePort; + target->jobArguments.move.targetName = text_to_cstring(targetNodeNameText); + target->jobArguments.move.targetPort = targetNodePort; +} + + +RebalanceJob * +GetScheduledRebalanceJob(void) +{ + const int scanKeyCount = 1; + ScanKeyData scanKey[1]; + bool indexOK = true; + + Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), + AccessShareLock); + + /* pg_dist_rebalance_jobs.status == 'scheduled' */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_status, + BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(JobStatusScheduledId())); + + SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, + DistRebalanceJobsStatusJobsIdIndexId(), + indexOK, NULL, scanKeyCount, scanKey); + + HeapTuple jobTuple = systable_getnext(scanDescriptor); + RebalanceJob *job = NULL; + if (HeapTupleIsValid(jobTuple)) + { + Form_pg_dist_rebalance_job jobData = NULL; + jobData = (Form_pg_dist_rebalance_job) GETSTRUCT(jobTuple); + + job = palloc0(sizeof(RebalanceJob)); + job->jobid = jobData->jobid; + job->status = RebalanceJobStatusByOid(jobData->status); + + /* TODO parse the actual job */ + Datum datumArray[Natts_pg_dist_rebalance_jobs]; + bool isNullArray[Natts_pg_dist_rebalance_jobs]; + TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); + heap_deform_tuple(jobTuple, tupleDescriptor, datumArray, isNullArray); + + if (!isNullArray[Anum_pg_dist_rebalance_jobs_citus_move_shard_placement - 1]) + { + /* citus_move_shard_placement job */ + ParseMoveJob( + job, + datumArray[Anum_pg_dist_rebalance_jobs_citus_move_shard_placement - 1]); + } + else + { + ereport(ERROR, (errmsg("undefined job type"))); + } + } + + systable_endscan(scanDescriptor); + table_close(pgDistRebalanceJobs, AccessShareLock); + + return job; +} + + +void +UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus) +{ + Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(), + RowExclusiveLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs); + + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + + /* WHERE jobid = job->jobid */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_jobid, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(job->jobid)); + + const bool indexOK = true; + SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs, + DistRebalanceJobsJobsIdIndexId(), + indexOK, + NULL, scanKeyCount, scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find rebalance job entry for jobid: " + UINT64_FORMAT, job->jobid))); + } + + Datum values[Natts_pg_dist_rebalance_jobs] = { 0 }; + bool isnull[Natts_pg_dist_rebalance_jobs] = { 0 }; + bool replace[Natts_pg_dist_rebalance_jobs] = { 0 }; + + values[Anum_pg_dist_rebalance_jobs_status - 1] = + ObjectIdGetDatum(RebalanceJobStatusOid(newStatus)); + isnull[Anum_pg_dist_rebalance_jobs_status - 1] = false; + replace[Anum_pg_dist_rebalance_jobs_status - 1] = true; + + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); + + CatalogTupleUpdate(pgDistRebalanceJobs, &heapTuple->t_self, heapTuple); + + CommandCounterIncrement(); + + systable_endscan(scanDescriptor); + table_close(pgDistRebalanceJobs, NoLock); +} diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index dd79f8d1e..632f693eb 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1882,6 +1882,16 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.rebalance_check_interval", + gettext_noop("Time to wait between checks for scheduled rebalance jobs."), + NULL, + &RebalanceCheckInterval, + 1000, -1, 7 * 24 * 3600 * 1000, + PGC_SIGHUP, + GUC_UNIT_MS, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.recover_2pc_interval", gettext_noop("Sets the time to wait between recovering 2PCs."), diff --git a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql index 4e58bdd51..75271e314 100644 --- a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql @@ -76,3 +76,26 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_ DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text); #include "udfs/isolate_tenant_to_new_shard/11.1-1.sql" + +CREATE TYPE citus.citus_move_shard_placement_arguments AS ( + shard_id bigint, + source_node_name text, + source_node_port integer, + target_node_name text, + target_node_port integer, + shard_transfer_mode citus.shard_transfer_mode +); +ALTER TYPE citus.citus_move_shard_placement_arguments SET SCHEMA pg_catalog; + +CREATE TYPE citus.citus_job_status AS ENUM ('scheduled', 'done'); +ALTER TYPE citus.citus_job_status SET SCHEMA pg_catalog; + +CREATE TABLE citus.pg_dist_rebalance_jobs( + jobid bigserial NOT NULL, + status pg_catalog.citus_job_status default 'scheduled', + citus_move_shard_placement pg_catalog.citus_move_shard_placement_arguments +); +-- SELECT granted to PUBLIC in upgrade script +ALTER TABLE citus.pg_dist_rebalance_jobs SET SCHEMA pg_catalog; +CREATE UNIQUE INDEX pg_dist_rebalance_jobs_jobid_index ON pg_catalog.pg_dist_rebalance_jobs using btree(jobid); +CREATE INDEX pg_dist_rebalance_jobs_status_jobid_index ON pg_catalog.pg_dist_rebalance_jobs using btree(status, jobid); diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 42a313b26..bc67912a1 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -93,6 +93,7 @@ typedef struct MaintenanceDaemonDBData double DistributedDeadlockDetectionTimeoutFactor = 2.0; int Recover2PCInterval = 60000; int DeferShardDeleteInterval = 15000; +int RebalanceCheckInterval = 1000; /* config variables for metadata sync timeout */ int MetadataSyncInterval = 60000; @@ -119,7 +120,8 @@ static void MaintenanceDaemonShmemExit(int code, Datum arg); static void MaintenanceDaemonErrorContext(void *arg); static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData); static void WarnMaintenanceDaemonNotStarted(void); - +static BackgroundWorkerHandle * StartRebalanceJobsBackgroundWorker(Oid database, + Oid extensionOwner); /* * InitializeMaintenanceDaemon, called at server start, is responsible for @@ -283,6 +285,9 @@ CitusMaintenanceDaemonMain(Datum main_arg) TimestampTz lastStatStatementsPurgeTime = 0; TimestampTz nextMetadataSyncTime = 0; + /* state kept for the reabalance check */ + TimestampTz lastRebalanceCheck = 0; + BackgroundWorkerHandle *rebalanceBgwHandle = NULL; /* * We do metadata sync in a separate background worker. We need its @@ -683,6 +688,65 @@ CitusMaintenanceDaemonMain(Datum main_arg) timeout = Min(timeout, (StatStatementsPurgeInterval * 1000)); } + pid_t rebalanceWorkerPid = 0; + BgwHandleStatus rebalanceWorkerStatus = + rebalanceBgwHandle != NULL ? + GetBackgroundWorkerPid(rebalanceBgwHandle, &rebalanceWorkerPid) : + BGWH_STOPPED; + if (TimestampDifferenceExceeds(lastRebalanceCheck, GetCurrentTimestamp(), + RebalanceCheckInterval) && + rebalanceWorkerStatus == BGWH_STOPPED) + { + /* clear old background worker for rebalancing before checking for new jobs */ + if (rebalanceBgwHandle) + { + TerminateBackgroundWorker(rebalanceBgwHandle); + pfree(rebalanceBgwHandle); + rebalanceBgwHandle = NULL; + } + + StartTransactionCommand(); + + bool shouldStartRebalanceJobsBackgroundWorker = false; + if (!LockCitusExtension()) + { + ereport(DEBUG1, (errmsg("could not lock the citus extension, " + "skipping stat statements purging"))); + } + else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) + { + /* perform catalog precheck */ + shouldStartRebalanceJobsBackgroundWorker = HasScheduledRebalanceJobs(); + } + + CommitTransactionCommand(); + + if (shouldStartRebalanceJobsBackgroundWorker) + { + /* spawn background worker */ + ereport(LOG, (errmsg("found scheduled job waiting for rebalance. " + "Starting background worker for execution."))); + + rebalanceBgwHandle = + StartRebalanceJobsBackgroundWorker(MyDatabaseId, + myDbData->userOid); + + if (!rebalanceBgwHandle || + GetBackgroundWorkerPid(rebalanceBgwHandle, &rebalanceWorkerPid) == + BGWH_STOPPED) + { + ereport(WARNING, (errmsg("unable to start background worker for " + "rebalance jobs"))); + + /* todo cooldown timer */ + } + } + + /* interval management */ + lastRebalanceCheck = GetCurrentTimestamp(); + timeout = Min(timeout, RebalanceCheckInterval); + } + /* * Wait until timeout, or until somebody wakes us up. Also cast the timeout to * integer where we've calculated it using double for not losing the precision. @@ -731,6 +795,100 @@ CitusMaintenanceDaemonMain(Datum main_arg) } +static BackgroundWorkerHandle * +StartRebalanceJobsBackgroundWorker(Oid database, Oid extensionOwner) +{ + BackgroundWorker worker; + BackgroundWorkerHandle *handle = NULL; + + /* Configure a worker. */ + memset(&worker, 0, sizeof(worker)); + SafeSnprintf(worker.bgw_name, BGW_MAXLEN, + "Citus Rebalance Jobs Worker: %u/%u", + database, extensionOwner); + worker.bgw_flags = + BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + + /* don't restart, we manage restarts from maintenance daemon */ + worker.bgw_restart_time = BGW_NEVER_RESTART; + strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus"); + strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name), + "RebalanceJobsBackgroundWorkerMain"); + worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId); + memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner, + sizeof(Oid)); + worker.bgw_notify_pid = MyProcPid; + + if (!RegisterDynamicBackgroundWorker(&worker, &handle)) + { + return NULL; + } + + pid_t pid; + WaitForBackgroundWorkerStartup(handle, &pid); + + return handle; +} + + +void +RebalanceJobsBackgroundWorkerMain(Datum arg) +{ + Oid databaseOid = DatumGetObjectId(arg); + + /* extension owner is passed via bgw_extra */ + Oid extensionOwner = InvalidOid; + memcpy_s(&extensionOwner, sizeof(extensionOwner), + MyBgworkerEntry->bgw_extra, sizeof(Oid)); + + BackgroundWorkerUnblockSignals(); + + /* connect to database, after that we can actually access catalogs */ + BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0); + + /* make worker recognizable in pg_stat_activity */ + pgstat_report_appname("rebalance jobs worker"); + + bool hasJobs = true; + while (hasJobs) + { + CHECK_FOR_INTERRUPTS(); + + InvalidateMetadataSystemCache(); + StartTransactionCommand(); + + PushActiveSnapshot(GetTransactionSnapshot()); + + if (!LockCitusExtension()) + { + ereport(DEBUG1, (errmsg("could not lock the citus extension, " + "skipping metadata sync"))); + } + else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) + { + RebalanceJob *job = GetScheduledRebalanceJob(); + if (job) + { + ereport(LOG, (errmsg("found job with jobid: %ld", job->jobid))); + + /* TODO execute job */ + + UpdateJobStatus(job, REBALANCE_JOB_STATUS_DONE); + } + else + { + hasJobs = false; + } + } + + PopActiveSnapshot(); + CommitTransactionCommand(); + ProcessCompletedNotifies(); + } +} + + /* * MaintenanceDaemonShmemSize computes how much shared memory is required. */ diff --git a/src/include/distributed/maintenanced.h b/src/include/distributed/maintenanced.h index a09d89085..306fab5d5 100644 --- a/src/include/distributed/maintenanced.h +++ b/src/include/distributed/maintenanced.h @@ -30,5 +30,6 @@ extern void InitializeMaintenanceDaemonBackend(void); extern bool LockCitusExtension(void); extern void CitusMaintenanceDaemonMain(Datum main_arg); +extern void RebalanceJobsBackgroundWorkerMain(Datum arg); #endif /* MAINTENANCED_H */ diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 0e9952e2d..06b664470 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -227,6 +227,7 @@ extern Oid DistPartitionRelationId(void); extern Oid DistShardRelationId(void); extern Oid DistPlacementRelationId(void); extern Oid DistNodeRelationId(void); +extern Oid DistRebalanceJobsRelationId(void); extern Oid DistRebalanceStrategyRelationId(void); extern Oid DistLocalGroupIdRelationId(void); extern Oid DistObjectRelationId(void); @@ -236,6 +237,8 @@ extern Oid DistEnabledCustomAggregatesId(void); extern Oid DistNodeNodeIdIndexId(void); extern Oid DistPartitionLogicalRelidIndexId(void); extern Oid DistPartitionColocationidIndexId(void); +extern Oid DistRebalanceJobsJobsIdIndexId(void); +extern Oid DistRebalanceJobsStatusJobsIdIndexId(void); extern Oid DistShardLogicalRelidIndexId(void); extern Oid DistShardShardidIndexId(void); extern Oid DistPlacementShardidIndexId(void); @@ -268,6 +271,8 @@ extern Oid SecondaryNodeRoleId(void); extern Oid CitusCopyFormatTypeId(void); extern Oid TextCopyFormatId(void); extern Oid BinaryCopyFormatId(void); +extern Oid JobStatusDoneId(void); +extern Oid JobStatusScheduledId(void); /* user related functions */ extern Oid CitusExtensionOwner(void); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 7d9d49646..87a8e9931 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -24,6 +24,7 @@ #include "distributed/citus_nodes.h" #include "distributed/connection_management.h" #include "distributed/errormessage.h" +#include "distributed/pg_dist_rebalance_jobs.h" #include "distributed/relay_utility.h" #include "distributed/worker_manager.h" #include "utils/acl.h" @@ -203,6 +204,38 @@ typedef enum SizeQueryType TABLE_SIZE /* pg_table_size() */ } SizeQueryType; +typedef enum RebalanceJobStatus +{ + REBALANCE_JOB_STATUS_UNKNOWN, + REBALANCE_JOB_STATUS_SCHEDULED, + REBALANCE_JOB_STATUS_DONE +} RebalanceJobStatus; + +typedef enum RebalanceJobType +{ + REBALANCE_JOB_TYPE_UNKNOWN, + REBALANCE_JOB_TYPE_MOVE +} RebalanceJobType; + +typedef struct RebalanceJob +{ + int64 jobid; + RebalanceJobStatus status; + + RebalanceJobType jobType; + union + { + struct + { + uint32 shardId; + char *sourceName; + int32 sourcePort; + char *targetName; + int32 targetPort; + } move; + } jobArguments; +} RebalanceJob; + /* Size functions */ extern Datum citus_table_size(PG_FUNCTION_ARGS); @@ -311,4 +344,7 @@ extern void EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid ownerRelationId); extern void AlterSequenceType(Oid seqOid, Oid typeOid); extern void EnsureRelationHasCompatibleSequenceTypes(Oid relationId); +extern bool HasScheduledRebalanceJobs(void); +extern RebalanceJob * GetScheduledRebalanceJob(void); +extern void UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus); #endif /* METADATA_UTILITY_H */ diff --git a/src/include/distributed/pg_dist_rebalance_jobs.h b/src/include/distributed/pg_dist_rebalance_jobs.h new file mode 100644 index 000000000..1073b6dc2 --- /dev/null +++ b/src/include/distributed/pg_dist_rebalance_jobs.h @@ -0,0 +1,34 @@ + +#ifndef CITUS_PG_DIST_REBALANCE_JOBS_H +#define CITUS_PG_DIST_REBALANCE_JOBS_H + +/* ---------------- + * pg_dist_rebalance_job definition. + * ---------------- + */ +typedef struct FormData_pg_dist_rebalance_job +{ + int64 jobid; + Oid status; +#ifdef CATALOG_VARLEN /* variable-length fields start here */ + text citus_move_shard_placement; /* text? we need to understand how to read a variable length stored custon type */ +#endif +} FormData_pg_dist_rebalance_job; + +/* ---------------- + * Form_pg_dist_colocation corresponds to a pointer to a tuple with + * the format of pg_dist_colocation relation. + * ---------------- + */ +typedef FormData_pg_dist_rebalance_job *Form_pg_dist_rebalance_job; + +/* ---------------- + * compiler constants for pg_dist_rebalance_jobs + * ---------------- + */ +#define Natts_pg_dist_rebalance_jobs 3 +#define Anum_pg_dist_rebalance_jobs_jobid 1 +#define Anum_pg_dist_rebalance_jobs_status 2 +#define Anum_pg_dist_rebalance_jobs_citus_move_shard_placement 3 + +#endif /* CITUS_PG_DIST_REBALANCE_JOBS_H */ diff --git a/src/include/distributed/shard_cleaner.h b/src/include/distributed/shard_cleaner.h index 8a98254f9..14ced520d 100644 --- a/src/include/distributed/shard_cleaner.h +++ b/src/include/distributed/shard_cleaner.h @@ -13,6 +13,7 @@ /* GUC to configure deferred shard deletion */ extern int DeferShardDeleteInterval; +extern int RebalanceCheckInterval; extern bool DeferShardDeleteOnMove; extern double DesiredPercentFreeAfterMove; extern bool CheckAvailableSpaceBeforeMove;