execute task based on dedicated compsite type

background-job-details
Nils Dijk 2022-07-08 15:02:32 +02:00 committed by Jelte Fennema
parent a7428c4ce6
commit 3365771162
2 changed files with 45 additions and 3 deletions

View File

@ -40,6 +40,7 @@
#include "distributed/shard_cleaner.h"
#include "distributed/metadata_sync.h"
#include "distributed/query_stats.h"
#include "distributed/repair_shards.h"
#include "distributed/statistics_collection.h"
#include "distributed/transaction_recovery.h"
#include "distributed/version_compat.h"
@ -54,6 +55,7 @@
#include "storage/lwlock.h"
#include "tcop/tcopprot.h"
#include "common/hashfn.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/lsyscache.h"
@ -122,6 +124,7 @@ static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData);
static void WarnMaintenanceDaemonNotStarted(void);
static BackgroundWorkerHandle * StartRebalanceJobsBackgroundWorker(Oid database,
Oid extensionOwner);
static bool ExecuteRebalanceJob(RebalanceJob *job);
/*
* InitializeMaintenanceDaemon, called at server start, is responsible for
@ -850,6 +853,8 @@ RebalanceJobsBackgroundWorkerMain(Datum arg)
/* make worker recognizable in pg_stat_activity */
pgstat_report_appname("rebalance jobs worker");
ereport(LOG, (errmsg("background jobs runner")));
bool hasJobs = true;
while (hasJobs)
{
@ -872,10 +877,11 @@ RebalanceJobsBackgroundWorkerMain(Datum arg)
{
ereport(LOG, (errmsg("found job with jobid: %ld", job->jobid)));
/* TODO execute job */
if (ExecuteRebalanceJob(job))
{
UpdateJobStatus(job, REBALANCE_JOB_STATUS_DONE);
}
}
else
{
hasJobs = false;
@ -889,6 +895,40 @@ RebalanceJobsBackgroundWorkerMain(Datum arg)
}
static bool
ExecuteMoveRebalanceJob(RebalanceJob *job)
{
DirectFunctionCall6Coll(citus_move_shard_placement, InvalidOid,
Int64GetDatum(job->jobArguments.move.shardId),
CStringGetTextDatum(job->jobArguments.move.sourceName),
Int32GetDatum(job->jobArguments.move.sourcePort),
CStringGetTextDatum(job->jobArguments.move.targetName),
Int32GetDatum(job->jobArguments.move.targetPort),
ObjectIdGetDatum(16598)); /* fix hardcoded value of 'block_writes' */
return true;
}
static bool
ExecuteRebalanceJob(RebalanceJob *job)
{
switch (job->jobType)
{
case REBALANCE_JOB_TYPE_MOVE:
{
return ExecuteMoveRebalanceJob(job);
}
default:
{
ereport(ERROR, (errmsg("undefined rebalance job for jobid: %ld",
job->jobid)));
}
}
}
/*
* MaintenanceDaemonShmemSize computes how much shared memory is required.
*/

View File

@ -17,3 +17,5 @@ extern void ErrorIfMoveUnsupportedTableType(Oid relationId);
extern void CopyShardsToNode(WorkerNode *sourceNode, WorkerNode *targetNode,
List *shardIntervalList, char *snapshotName);
extern void VerifyTablesHaveReplicaIdentity(List *colocatedTableList);
extern Datum citus_move_shard_placement(PG_FUNCTION_ARGS);