mirror of https://github.com/citusdata/citus.git
WIP refactor background job execution
parent
80e2edf1a5
commit
ec292cde48
|
@ -2667,7 +2667,8 @@ GetScheduledRebalanceJobByJobID(int64 jobId)
|
||||||
|
|
||||||
|
|
||||||
void
|
void
|
||||||
UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus)
|
UpdateJobStatus(int64 jobid, pid_t *pid, RebalanceJobStatus status, int32 *retry_count,
|
||||||
|
char *message)
|
||||||
{
|
{
|
||||||
Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(),
|
Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(),
|
||||||
RowExclusiveLock);
|
RowExclusiveLock);
|
||||||
|
@ -2678,7 +2679,7 @@ UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus)
|
||||||
|
|
||||||
/* WHERE jobid = job->jobid */
|
/* WHERE jobid = job->jobid */
|
||||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_jobid,
|
ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_jobid,
|
||||||
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(job->jobid));
|
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobid));
|
||||||
|
|
||||||
const bool indexOK = true;
|
const bool indexOK = true;
|
||||||
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs,
|
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs,
|
||||||
|
@ -2690,69 +2691,7 @@ UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus)
|
||||||
if (!HeapTupleIsValid(heapTuple))
|
if (!HeapTupleIsValid(heapTuple))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("could not find rebalance job entry for jobid: "
|
ereport(ERROR, (errmsg("could not find rebalance job entry for jobid: "
|
||||||
UINT64_FORMAT, job->jobid)));
|
UINT64_FORMAT, jobid)));
|
||||||
}
|
|
||||||
|
|
||||||
Datum values[Natts_pg_dist_rebalance_jobs] = { 0 };
|
|
||||||
bool isnull[Natts_pg_dist_rebalance_jobs] = { 0 };
|
|
||||||
bool replace[Natts_pg_dist_rebalance_jobs] = { 0 };
|
|
||||||
|
|
||||||
values[Anum_pg_dist_rebalance_jobs_status - 1] =
|
|
||||||
ObjectIdGetDatum(RebalanceJobStatusOid(newStatus));
|
|
||||||
isnull[Anum_pg_dist_rebalance_jobs_status - 1] = false;
|
|
||||||
replace[Anum_pg_dist_rebalance_jobs_status - 1] = true;
|
|
||||||
|
|
||||||
/* TODO figure out a nice way on how to update a tuple selectively */
|
|
||||||
if (newStatus == REBALANCE_JOB_STATUS_RUNNING)
|
|
||||||
{
|
|
||||||
/* update pid for running status */
|
|
||||||
values[Anum_pg_dist_rebalance_jobs_pid - 1] = Int32GetDatum((int32) MyProcPid);
|
|
||||||
isnull[Anum_pg_dist_rebalance_jobs_pid - 1] = false;
|
|
||||||
replace[Anum_pg_dist_rebalance_jobs_pid - 1] = true;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
values[Anum_pg_dist_rebalance_jobs_pid - 1] = 0;
|
|
||||||
isnull[Anum_pg_dist_rebalance_jobs_pid - 1] = true;
|
|
||||||
replace[Anum_pg_dist_rebalance_jobs_pid - 1] = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
|
|
||||||
|
|
||||||
CatalogTupleUpdate(pgDistRebalanceJobs, &heapTuple->t_self, heapTuple);
|
|
||||||
|
|
||||||
CommandCounterIncrement();
|
|
||||||
|
|
||||||
systable_endscan(scanDescriptor);
|
|
||||||
table_close(pgDistRebalanceJobs, NoLock);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
bool
|
|
||||||
UpdateJobError(RebalanceJob *job, ErrorData *edata)
|
|
||||||
{
|
|
||||||
Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(),
|
|
||||||
RowExclusiveLock);
|
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs);
|
|
||||||
|
|
||||||
ScanKeyData scanKey[1];
|
|
||||||
int scanKeyCount = 1;
|
|
||||||
|
|
||||||
/* WHERE jobid = job->jobid */
|
|
||||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_jobid,
|
|
||||||
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(job->jobid));
|
|
||||||
|
|
||||||
const bool indexOK = true;
|
|
||||||
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs,
|
|
||||||
DistRebalanceJobsJobsIdIndexId(),
|
|
||||||
indexOK,
|
|
||||||
NULL, scanKeyCount, scanKey);
|
|
||||||
|
|
||||||
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
|
||||||
if (!HeapTupleIsValid(heapTuple))
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("could not find rebalance job entry for jobid: "
|
|
||||||
UINT64_FORMAT, job->jobid)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Datum values[Natts_pg_dist_rebalance_jobs] = { 0 };
|
Datum values[Natts_pg_dist_rebalance_jobs] = { 0 };
|
||||||
|
@ -2761,79 +2700,45 @@ UpdateJobError(RebalanceJob *job, ErrorData *edata)
|
||||||
|
|
||||||
heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull);
|
heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull);
|
||||||
|
|
||||||
/* increment retry count */
|
#define UPDATE_FIELD(field, newNull, newValue) \
|
||||||
int retryCount = 0;
|
replace[(field - 1)] = ((newNull != isnull[(field - 1)]) || (values[(field - 1)] != \
|
||||||
if (!isnull[Anum_pg_dist_rebalance_jobs_retry_count - 1])
|
newValue)); \
|
||||||
|
isnull[(field - 1)] = (newNull); \
|
||||||
|
values[(field - 1)] = (newValue);
|
||||||
|
|
||||||
|
if (pid)
|
||||||
{
|
{
|
||||||
retryCount = DatumGetInt32(values[Anum_pg_dist_rebalance_jobs_retry_count - 1]);
|
UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_pid, false, Int32GetDatum(*pid));
|
||||||
retryCount++;
|
|
||||||
}
|
}
|
||||||
values[Anum_pg_dist_rebalance_jobs_retry_count - 1] = Int32GetDatum(retryCount);
|
else
|
||||||
isnull[Anum_pg_dist_rebalance_jobs_retry_count - 1] = false;
|
|
||||||
replace[Anum_pg_dist_rebalance_jobs_retry_count - 1] = true;
|
|
||||||
|
|
||||||
values[Anum_pg_dist_rebalance_jobs_pid - 1] = InvalidOid;
|
|
||||||
isnull[Anum_pg_dist_rebalance_jobs_pid - 1] = true;
|
|
||||||
replace[Anum_pg_dist_rebalance_jobs_pid - 1] = true;
|
|
||||||
|
|
||||||
bool statusError = false;
|
|
||||||
if (retryCount >= 3)
|
|
||||||
{
|
{
|
||||||
/* after 3 failures we will transition the job to error and stop executing */
|
UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_pid, true, InvalidOid);
|
||||||
values[Anum_pg_dist_rebalance_jobs_status - 1] =
|
|
||||||
ObjectIdGetDatum(JobStatusErrorId());
|
|
||||||
isnull[Anum_pg_dist_rebalance_jobs_status - 1] = false;
|
|
||||||
replace[Anum_pg_dist_rebalance_jobs_status - 1] = true;
|
|
||||||
|
|
||||||
statusError = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
StringInfoData buf = { 0 };
|
Oid statusOid = ObjectIdGetDatum(RebalanceJobStatusOid(status));
|
||||||
initStringInfo(&buf);
|
UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_status, false, statusOid);
|
||||||
|
|
||||||
if (edata->message)
|
if (retry_count)
|
||||||
{
|
{
|
||||||
if (buf.len > 0)
|
UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_retry_count, false, Int32GetDatum(
|
||||||
{
|
*retry_count));
|
||||||
appendStringInfo(&buf, "\n");
|
}
|
||||||
}
|
else
|
||||||
appendStringInfoString(&buf, "ERROR: ");
|
{
|
||||||
appendStringInfoString(&buf, edata->message);
|
UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_retry_count, true, InvalidOid);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (edata->hint)
|
if (message)
|
||||||
{
|
{
|
||||||
if (buf.len > 0)
|
Oid messageOid = CStringGetTextDatum(message);
|
||||||
{
|
UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_message, false, messageOid);
|
||||||
appendStringInfo(&buf, "\n");
|
}
|
||||||
}
|
else
|
||||||
appendStringInfoString(&buf, "HINT: ");
|
{
|
||||||
appendStringInfoString(&buf, edata->hint);
|
UPDATE_FIELD(Anum_pg_dist_rebalance_jobs_message, true, InvalidOid);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (edata->detail)
|
#undef UPDATE_FIELD
|
||||||
{
|
|
||||||
if (buf.len > 0)
|
|
||||||
{
|
|
||||||
appendStringInfo(&buf, "\n");
|
|
||||||
}
|
|
||||||
appendStringInfoString(&buf, "DETAIL: ");
|
|
||||||
appendStringInfoString(&buf, edata->detail);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (edata->context)
|
|
||||||
{
|
|
||||||
if (buf.len > 0)
|
|
||||||
{
|
|
||||||
appendStringInfo(&buf, "\n");
|
|
||||||
}
|
|
||||||
appendStringInfoString(&buf, "CONTEXT: ");
|
|
||||||
appendStringInfoString(&buf, edata->context);
|
|
||||||
}
|
|
||||||
|
|
||||||
values[Anum_pg_dist_rebalance_jobs_message - 1] = CStringGetTextDatum(buf.data);
|
|
||||||
isnull[Anum_pg_dist_rebalance_jobs_message - 1] = false;
|
|
||||||
replace[Anum_pg_dist_rebalance_jobs_message - 1] = true;
|
|
||||||
|
|
||||||
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
|
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
|
||||||
|
|
||||||
|
@ -2843,14 +2748,6 @@ UpdateJobError(RebalanceJob *job, ErrorData *edata)
|
||||||
|
|
||||||
systable_endscan(scanDescriptor);
|
systable_endscan(scanDescriptor);
|
||||||
table_close(pgDistRebalanceJobs, NoLock);
|
table_close(pgDistRebalanceJobs, NoLock);
|
||||||
|
|
||||||
/* when we have changed the status to Error we will need to unschedule all dependent jobs (recursively) */
|
|
||||||
if (statusError)
|
|
||||||
{
|
|
||||||
UnscheduleDependantJobs(job->jobid);
|
|
||||||
}
|
|
||||||
|
|
||||||
return statusError;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,7 @@
|
||||||
#include "common/string.h"
|
#include "common/string.h"
|
||||||
#include "executor/executor.h"
|
#include "executor/executor.h"
|
||||||
#include "distributed/backend_data.h"
|
#include "distributed/backend_data.h"
|
||||||
|
#include "distributed/background_jobs.h"
|
||||||
#include "distributed/citus_depended_object.h"
|
#include "distributed/citus_depended_object.h"
|
||||||
#include "distributed/citus_nodefuncs.h"
|
#include "distributed/citus_nodefuncs.h"
|
||||||
#include "distributed/citus_safe_lib.h"
|
#include "distributed/citus_safe_lib.h"
|
||||||
|
|
|
@ -0,0 +1,537 @@
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
|
||||||
|
#include "safe_mem_lib.h"
|
||||||
|
|
||||||
|
#include "libpq/pqmq.h"
|
||||||
|
#include "parser/analyze.h"
|
||||||
|
#include "pgstat.h"
|
||||||
|
#include "storage/dsm.h"
|
||||||
|
#include "storage/ipc.h"
|
||||||
|
#include "storage/shm_mq.h"
|
||||||
|
#include "storage/shm_toc.h"
|
||||||
|
#include "tcop/pquery.h"
|
||||||
|
#include "tcop/tcopprot.h"
|
||||||
|
#include "tcop/utility.h"
|
||||||
|
#include "utils/backend_status.h"
|
||||||
|
#include "utils/memutils.h"
|
||||||
|
#include "utils/portal.h"
|
||||||
|
#include "utils/ps_status.h"
|
||||||
|
#include "utils/resowner.h"
|
||||||
|
#include "utils/snapmgr.h"
|
||||||
|
#include "utils/timeout.h"
|
||||||
|
|
||||||
|
#include "distributed/background_jobs.h"
|
||||||
|
#include "distributed/citus_safe_lib.h"
|
||||||
|
#include "distributed/listutils.h"
|
||||||
|
#include "distributed/maintenanced.h"
|
||||||
|
#include "distributed/metadata_cache.h"
|
||||||
|
#include "distributed/metadata_utility.h"
|
||||||
|
#include "distributed/shard_cleaner.h"
|
||||||
|
|
||||||
|
bool RebalanceJobDebugDelay = false;
|
||||||
|
|
||||||
|
/* Table-of-contents constants for our dynamic shared memory segment. */
|
||||||
|
#define CITUS_BACKGROUND_JOB_MAGIC 0x51028081
|
||||||
|
#define CITUS_BACKGROUND_JOB_KEY_DATABASE 0
|
||||||
|
#define CITUS_BACKGROUND_JOB_KEY_USERNAME 1
|
||||||
|
#define CITUS_BACKGROUND_JOB_KEY_COMMAND 2
|
||||||
|
#define CITUS_BACKGROUND_JOB_KEY_QUEUE 3
|
||||||
|
#define CITUS_BACKGROUND_JOB_NKEYS 4
|
||||||
|
|
||||||
|
static BackgroundWorkerHandle * StartCitusBackgroundJobExecuter(char *database,
|
||||||
|
char *user,
|
||||||
|
char *command);
|
||||||
|
static void ExecuteSqlString(const char *sql);
|
||||||
|
|
||||||
|
BackgroundWorkerHandle *
|
||||||
|
StartCitusBackgroundJobWorker(Oid database, Oid extensionOwner)
|
||||||
|
{
|
||||||
|
BackgroundWorker worker = { 0 };
|
||||||
|
BackgroundWorkerHandle *handle = NULL;
|
||||||
|
|
||||||
|
/* Configure a worker. */
|
||||||
|
memset(&worker, 0, sizeof(worker));
|
||||||
|
SafeSnprintf(worker.bgw_name, BGW_MAXLEN,
|
||||||
|
"Citus Rebalance Jobs Worker: %u/%u",
|
||||||
|
database, extensionOwner);
|
||||||
|
worker.bgw_flags =
|
||||||
|
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
|
||||||
|
worker.bgw_start_time = BgWorkerStart_ConsistentState;
|
||||||
|
|
||||||
|
/* don't restart, we manage restarts from maintenance daemon */
|
||||||
|
worker.bgw_restart_time = BGW_NEVER_RESTART;
|
||||||
|
strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
|
||||||
|
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
|
||||||
|
"CitusBackgroundJobMain");
|
||||||
|
worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId);
|
||||||
|
memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner,
|
||||||
|
sizeof(Oid));
|
||||||
|
worker.bgw_notify_pid = MyProcPid;
|
||||||
|
|
||||||
|
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pid_t pid;
|
||||||
|
WaitForBackgroundWorkerStartup(handle, &pid);
|
||||||
|
|
||||||
|
return handle;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void
|
||||||
|
CitusBackgroundJobMain(Datum arg)
|
||||||
|
{
|
||||||
|
Oid databaseOid = DatumGetObjectId(arg);
|
||||||
|
|
||||||
|
/* extension owner is passed via bgw_extra */
|
||||||
|
Oid extensionOwner = InvalidOid;
|
||||||
|
memcpy_s(&extensionOwner, sizeof(extensionOwner),
|
||||||
|
MyBgworkerEntry->bgw_extra, sizeof(Oid));
|
||||||
|
|
||||||
|
BackgroundWorkerUnblockSignals();
|
||||||
|
|
||||||
|
/* connect to database, after that we can actually access catalogs */
|
||||||
|
BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0);
|
||||||
|
|
||||||
|
/* make worker recognizable in pg_stat_activity */
|
||||||
|
pgstat_report_appname("rebalance jobs worker");
|
||||||
|
|
||||||
|
ereport(LOG, (errmsg("background jobs runner")));
|
||||||
|
|
||||||
|
if (RebalanceJobDebugDelay)
|
||||||
|
{
|
||||||
|
pg_usleep(30 * 1000 * 1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
MemoryContext perJobContext = AllocSetContextCreateExtended(CurrentMemoryContext,
|
||||||
|
"PerJobContext",
|
||||||
|
ALLOCSET_DEFAULT_MINSIZE,
|
||||||
|
ALLOCSET_DEFAULT_INITSIZE,
|
||||||
|
ALLOCSET_DEFAULT_MAXSIZE);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* First we find all jobs that are running, we need to check if they are still running
|
||||||
|
* if not reset their state back to scheduled.
|
||||||
|
*/
|
||||||
|
{
|
||||||
|
StartTransactionCommand();
|
||||||
|
PushActiveSnapshot(GetTransactionSnapshot());
|
||||||
|
|
||||||
|
/* TODO have an actual function to check if the worker is still running */
|
||||||
|
ResetRunningJobs();
|
||||||
|
|
||||||
|
PopActiveSnapshot();
|
||||||
|
CommitTransactionCommand();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
MemoryContext oldContextPerJob = MemoryContextSwitchTo(perJobContext);
|
||||||
|
bool hasJobs = true;
|
||||||
|
while (hasJobs)
|
||||||
|
{
|
||||||
|
MemoryContextReset(perJobContext);
|
||||||
|
|
||||||
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
|
||||||
|
InvalidateMetadataSystemCache();
|
||||||
|
StartTransactionCommand();
|
||||||
|
PushActiveSnapshot(GetTransactionSnapshot());
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We need to load the job into the perJobContext as we will switch contexts
|
||||||
|
* later due to the committing and starting of new transactions
|
||||||
|
*/
|
||||||
|
MemoryContext oldContext = MemoryContextSwitchTo(perJobContext);
|
||||||
|
RebalanceJob *job = GetRunableRebalanceJob();
|
||||||
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
|
||||||
|
if (!job)
|
||||||
|
{
|
||||||
|
PopActiveSnapshot();
|
||||||
|
CommitTransactionCommand();
|
||||||
|
|
||||||
|
hasJobs = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
PopActiveSnapshot();
|
||||||
|
CommitTransactionCommand();
|
||||||
|
|
||||||
|
MemoryContextSwitchTo(perJobContext);
|
||||||
|
|
||||||
|
/* TODO find the actual database and username */
|
||||||
|
BackgroundWorkerHandle *handle =
|
||||||
|
StartCitusBackgroundJobExecuter("postgres", "nilsdijk", job->command);
|
||||||
|
|
||||||
|
if (handle == NULL)
|
||||||
|
{
|
||||||
|
/* TODO something better here */
|
||||||
|
ereport(ERROR, (errmsg("unable to start background worker")));
|
||||||
|
}
|
||||||
|
|
||||||
|
pid_t pid = 0;
|
||||||
|
GetBackgroundWorkerPid(handle, &pid);
|
||||||
|
|
||||||
|
ereport(LOG, (errmsg("found job with jobid: %ld", job->jobid)));
|
||||||
|
|
||||||
|
StartTransactionCommand();
|
||||||
|
PushActiveSnapshot(GetTransactionSnapshot());
|
||||||
|
|
||||||
|
/* Update job status to indicate it is running */
|
||||||
|
UpdateJobStatus(job->jobid, &pid, REBALANCE_JOB_STATUS_RUNNING, NULL, NULL);
|
||||||
|
|
||||||
|
PopActiveSnapshot();
|
||||||
|
CommitTransactionCommand();
|
||||||
|
|
||||||
|
MemoryContextSwitchTo(perJobContext);
|
||||||
|
|
||||||
|
/* TODO keep polling the job */
|
||||||
|
while (GetBackgroundWorkerPid(handle, &pid) != BGWH_STOPPED)
|
||||||
|
{
|
||||||
|
int latchFlags = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH;
|
||||||
|
int rc = WaitLatch(MyLatch, latchFlags, (long) 1000, PG_WAIT_EXTENSION);
|
||||||
|
|
||||||
|
/* emergency bailout if postmaster has died */
|
||||||
|
if (rc & WL_POSTMASTER_DEATH)
|
||||||
|
{
|
||||||
|
proc_exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rc & WL_LATCH_SET)
|
||||||
|
{
|
||||||
|
ResetLatch(MyLatch);
|
||||||
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
StartTransactionCommand();
|
||||||
|
PushActiveSnapshot(GetTransactionSnapshot());
|
||||||
|
|
||||||
|
/* TODO job can actually also have failed*/
|
||||||
|
UpdateJobStatus(job->jobid, NULL, REBALANCE_JOB_STATUS_DONE, NULL, NULL);
|
||||||
|
|
||||||
|
PopActiveSnapshot();
|
||||||
|
CommitTransactionCommand();
|
||||||
|
}
|
||||||
|
|
||||||
|
MemoryContextSwitchTo(oldContextPerJob);
|
||||||
|
MemoryContextDelete(perJobContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static dsm_segment *
|
||||||
|
StoreArgumentsInDSM(char *database, char *username, char *command)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Create the shared memory that we will pass to the background
|
||||||
|
* worker process. We use DSM_CREATE_NULL_IF_MAXSEGMENTS so that we
|
||||||
|
* do not ERROR here. This way, we can mark the job as failed and
|
||||||
|
* keep the launcher process running normally.
|
||||||
|
*/
|
||||||
|
shm_toc_estimator e = { 0 };
|
||||||
|
shm_toc_initialize_estimator(&e);
|
||||||
|
shm_toc_estimate_chunk(&e, strlen(database) + 1);
|
||||||
|
shm_toc_estimate_chunk(&e, strlen(username) + 1);
|
||||||
|
shm_toc_estimate_chunk(&e, strlen(command) + 1);
|
||||||
|
#define QUEUE_SIZE ((Size) 65536)
|
||||||
|
shm_toc_estimate_chunk(&e, QUEUE_SIZE);
|
||||||
|
shm_toc_estimate_keys(&e, CITUS_BACKGROUND_JOB_NKEYS);
|
||||||
|
Size segsize = shm_toc_estimate(&e);
|
||||||
|
|
||||||
|
dsm_segment *seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
|
||||||
|
if (seg == NULL)
|
||||||
|
{
|
||||||
|
ereport(ERROR,
|
||||||
|
(errmsg("max number of DSM segments may has been reached")));
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
shm_toc *toc = shm_toc_create(CITUS_BACKGROUND_JOB_MAGIC, dsm_segment_address(seg),
|
||||||
|
segsize);
|
||||||
|
|
||||||
|
Size size = strlen(database) + 1;
|
||||||
|
char *databaseTarget = shm_toc_allocate(toc, size);
|
||||||
|
strcpy_s(databaseTarget, size, database);
|
||||||
|
shm_toc_insert(toc, CITUS_BACKGROUND_JOB_KEY_DATABASE, databaseTarget);
|
||||||
|
|
||||||
|
size = strlen(username) + 1;
|
||||||
|
char *usernameTarget = shm_toc_allocate(toc, size);
|
||||||
|
strcpy_s(usernameTarget, size, username);
|
||||||
|
shm_toc_insert(toc, CITUS_BACKGROUND_JOB_KEY_USERNAME, usernameTarget);
|
||||||
|
|
||||||
|
size = strlen(command) + 1;
|
||||||
|
char *commandTarget = shm_toc_allocate(toc, size);
|
||||||
|
strcpy_s(commandTarget, size, command);
|
||||||
|
shm_toc_insert(toc, CITUS_BACKGROUND_JOB_KEY_COMMAND, commandTarget);
|
||||||
|
|
||||||
|
shm_mq *mq = shm_mq_create(shm_toc_allocate(toc, QUEUE_SIZE), QUEUE_SIZE);
|
||||||
|
shm_toc_insert(toc, CITUS_BACKGROUND_JOB_KEY_QUEUE, mq);
|
||||||
|
shm_mq_set_receiver(mq, MyProc);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Attach the queue before launching a worker, so that we'll automatically
|
||||||
|
* detach the queue if we error out. (Otherwise, the worker might sit
|
||||||
|
* there trying to write the queue long after we've gone away.)
|
||||||
|
*/
|
||||||
|
MemoryContext oldcontext = MemoryContextSwitchTo(TopMemoryContext);
|
||||||
|
shm_mq_attach(mq, seg, NULL);
|
||||||
|
MemoryContextSwitchTo(oldcontext);
|
||||||
|
|
||||||
|
return seg;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static BackgroundWorkerHandle *
|
||||||
|
StartCitusBackgroundJobExecuter(char *database, char *username, char *command)
|
||||||
|
{
|
||||||
|
dsm_segment *seg = StoreArgumentsInDSM(database, username, command);
|
||||||
|
|
||||||
|
/* Configure a worker. */
|
||||||
|
BackgroundWorker worker = { 0 };
|
||||||
|
memset(&worker, 0, sizeof(worker));
|
||||||
|
SafeSnprintf(worker.bgw_name, BGW_MAXLEN, "Citus Background Job Executor: %s/%s",
|
||||||
|
database, username);
|
||||||
|
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
|
||||||
|
worker.bgw_start_time = BgWorkerStart_ConsistentState;
|
||||||
|
|
||||||
|
/* don't restart, we manage restarts from maintenance daemon */
|
||||||
|
worker.bgw_restart_time = BGW_NEVER_RESTART;
|
||||||
|
strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
|
||||||
|
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
|
||||||
|
"CitusBackgroundJobExecuter");
|
||||||
|
worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
|
||||||
|
worker.bgw_notify_pid = MyProcPid;
|
||||||
|
|
||||||
|
BackgroundWorkerHandle *handle = NULL;
|
||||||
|
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
|
||||||
|
{
|
||||||
|
dsm_detach(seg);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pid_t pid = { 0 };
|
||||||
|
WaitForBackgroundWorkerStartup(handle, &pid);
|
||||||
|
|
||||||
|
return handle;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Background worker logic.
|
||||||
|
*
|
||||||
|
* based on the background worker logic in pgcron
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
CitusBackgroundJobExecuter(Datum main_arg)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* TODO figure out if we need this signal handler that is in pgcron
|
||||||
|
* pqsignal(SIGTERM, pg_cron_background_worker_sigterm);
|
||||||
|
*/
|
||||||
|
BackgroundWorkerUnblockSignals();
|
||||||
|
|
||||||
|
/* Set up a memory context and resource owner. */
|
||||||
|
Assert(CurrentResourceOwner == NULL);
|
||||||
|
CurrentResourceOwner = ResourceOwnerCreate(NULL, "citus background job");
|
||||||
|
CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
|
||||||
|
"citus background job execution",
|
||||||
|
ALLOCSET_DEFAULT_MINSIZE,
|
||||||
|
ALLOCSET_DEFAULT_INITSIZE,
|
||||||
|
ALLOCSET_DEFAULT_MAXSIZE);
|
||||||
|
|
||||||
|
/* Set up a dynamic shared memory segment. */
|
||||||
|
dsm_segment *seg = dsm_attach(DatumGetInt32(main_arg));
|
||||||
|
if (seg == NULL)
|
||||||
|
{
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
|
errmsg("unable to map dynamic shared memory segment")));
|
||||||
|
}
|
||||||
|
|
||||||
|
shm_toc *toc = shm_toc_attach(CITUS_BACKGROUND_JOB_MAGIC, dsm_segment_address(seg));
|
||||||
|
if (toc == NULL)
|
||||||
|
{
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
|
errmsg("bad magic number in dynamic shared memory segment")));
|
||||||
|
}
|
||||||
|
|
||||||
|
char *database = shm_toc_lookup(toc, CITUS_BACKGROUND_JOB_KEY_DATABASE, false);
|
||||||
|
char *username = shm_toc_lookup(toc, CITUS_BACKGROUND_JOB_KEY_USERNAME, false);
|
||||||
|
char *command = shm_toc_lookup(toc, CITUS_BACKGROUND_JOB_KEY_COMMAND, false);
|
||||||
|
shm_mq *mq = shm_toc_lookup(toc, CITUS_BACKGROUND_JOB_KEY_QUEUE, false);
|
||||||
|
|
||||||
|
shm_mq_set_sender(mq, MyProc);
|
||||||
|
shm_mq_handle *responseq = shm_mq_attach(mq, seg, NULL);
|
||||||
|
pq_redirect_to_shm_mq(seg, responseq);
|
||||||
|
|
||||||
|
BackgroundWorkerInitializeConnection(database, username, 0);
|
||||||
|
|
||||||
|
/* Prepare to execute the query. */
|
||||||
|
SetCurrentStatementStartTimestamp();
|
||||||
|
debug_query_string = command;
|
||||||
|
pgstat_report_activity(STATE_RUNNING, command);
|
||||||
|
StartTransactionCommand();
|
||||||
|
if (StatementTimeout > 0)
|
||||||
|
{
|
||||||
|
enable_timeout_after(STATEMENT_TIMEOUT, StatementTimeout);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
disable_timeout(STATEMENT_TIMEOUT, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Execute the query. */
|
||||||
|
ExecuteSqlString(command);
|
||||||
|
|
||||||
|
/* Post-execution cleanup. */
|
||||||
|
disable_timeout(STATEMENT_TIMEOUT, false);
|
||||||
|
CommitTransactionCommand();
|
||||||
|
pgstat_report_activity(STATE_IDLE, command);
|
||||||
|
pgstat_report_stat(true);
|
||||||
|
|
||||||
|
/* Signal that we are done. */
|
||||||
|
ReadyForQuery(DestRemote);
|
||||||
|
|
||||||
|
dsm_detach(seg);
|
||||||
|
proc_exit(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Execute given SQL string without SPI or a libpq session.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ExecuteSqlString(const char *sql)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Parse the SQL string into a list of raw parse trees.
|
||||||
|
*
|
||||||
|
* Because we allow statements that perform internal transaction control,
|
||||||
|
* we can't do this in TopTransactionContext; the parse trees might get
|
||||||
|
* blown away before we're done executing them.
|
||||||
|
*/
|
||||||
|
MemoryContext parsecontext = AllocSetContextCreate(CurrentMemoryContext,
|
||||||
|
"query parse/plan",
|
||||||
|
ALLOCSET_DEFAULT_MINSIZE,
|
||||||
|
ALLOCSET_DEFAULT_INITSIZE,
|
||||||
|
ALLOCSET_DEFAULT_MAXSIZE);
|
||||||
|
MemoryContext oldcontext = MemoryContextSwitchTo(parsecontext);
|
||||||
|
List *raw_parsetree_list = pg_parse_query(sql);
|
||||||
|
int commands_remaining = list_length(raw_parsetree_list);
|
||||||
|
bool isTopLevel = commands_remaining == 1;
|
||||||
|
MemoryContextSwitchTo(oldcontext);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Do parse analysis, rule rewrite, planning, and execution for each raw
|
||||||
|
* parsetree. We must fully execute each query before beginning parse
|
||||||
|
* analysis on the next one, since there may be interdependencies.
|
||||||
|
*/
|
||||||
|
RawStmt *parsetree = NULL;
|
||||||
|
foreach_ptr(parsetree, raw_parsetree_list)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We don't allow transaction-control commands like COMMIT and ABORT
|
||||||
|
* here. The entire SQL statement is executed as a single transaction
|
||||||
|
* which commits if no errors are encountered.
|
||||||
|
*/
|
||||||
|
if (IsA(parsetree, TransactionStmt))
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg(
|
||||||
|
"transaction control statements are not allowed in background job")));
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Get the command name for use in status display (it also becomes the
|
||||||
|
* default completion tag, down inside PortalRun). Set ps_status and
|
||||||
|
* do any special start-of-SQL-command processing needed by the
|
||||||
|
* destination.
|
||||||
|
*/
|
||||||
|
CommandTag commandTag = CreateCommandTag(parsetree->stmt);
|
||||||
|
set_ps_display(GetCommandTagName(commandTag));
|
||||||
|
BeginCommand(commandTag, DestNone);
|
||||||
|
|
||||||
|
/* Set up a snapshot if parse analysis/planning will need one. */
|
||||||
|
bool snapshot_set = false;
|
||||||
|
if (analyze_requires_snapshot(parsetree))
|
||||||
|
{
|
||||||
|
PushActiveSnapshot(GetTransactionSnapshot());
|
||||||
|
snapshot_set = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* OK to analyze, rewrite, and plan this query.
|
||||||
|
*
|
||||||
|
* As with parsing, we need to make sure this data outlives the
|
||||||
|
* transaction, because of the possibility that the statement might
|
||||||
|
* perform internal transaction control.
|
||||||
|
*/
|
||||||
|
oldcontext = MemoryContextSwitchTo(parsecontext);
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= 150000
|
||||||
|
List *querytree_list =
|
||||||
|
pg_analyze_and_rewrite_fixedparams(parsetree, sql, NULL, 0, NULL);
|
||||||
|
#else
|
||||||
|
List *querytree_list =
|
||||||
|
pg_analyze_and_rewrite(parsetree, sql, NULL, 0, NULL);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
List *plantree_list = pg_plan_queries(querytree_list, sql, 0, NULL);
|
||||||
|
|
||||||
|
/* Done with the snapshot used for parsing/planning */
|
||||||
|
if (snapshot_set)
|
||||||
|
{
|
||||||
|
PopActiveSnapshot();
|
||||||
|
}
|
||||||
|
|
||||||
|
/* If we got a cancel signal in analysis or planning, quit */
|
||||||
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Execute the query using the unnamed portal.
|
||||||
|
*/
|
||||||
|
Portal portal = CreatePortal("", true, true);
|
||||||
|
|
||||||
|
/* Don't display the portal in pg_cursors */
|
||||||
|
portal->visible = false;
|
||||||
|
PortalDefineQuery(portal, NULL, sql, commandTag, plantree_list, NULL);
|
||||||
|
PortalStart(portal, NULL, 0, InvalidSnapshot);
|
||||||
|
int16 format[] = { 1 };
|
||||||
|
PortalSetResultFormat(portal, lengthof(format), format); /* binary format */
|
||||||
|
|
||||||
|
commands_remaining--;
|
||||||
|
DestReceiver *receiver = CreateDestReceiver(DestNone);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Only once the portal and destreceiver have been established can
|
||||||
|
* we return to the transaction context. All that stuff needs to
|
||||||
|
* survive an internal commit inside PortalRun!
|
||||||
|
*/
|
||||||
|
MemoryContextSwitchTo(oldcontext);
|
||||||
|
|
||||||
|
/* Here's where we actually execute the command. */
|
||||||
|
QueryCompletion qc = { 0 };
|
||||||
|
(void) PortalRun(portal, FETCH_ALL, isTopLevel, true, receiver, receiver, &qc);
|
||||||
|
|
||||||
|
/* Clean up the receiver. */
|
||||||
|
(*receiver->rDestroy)(receiver);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Send a CommandComplete message even if we suppressed the query
|
||||||
|
* results. The user backend will report these in the absence of
|
||||||
|
* any true query results.
|
||||||
|
*/
|
||||||
|
EndCommand(&qc, DestRemote, false);
|
||||||
|
|
||||||
|
/* Clean up the portal. */
|
||||||
|
PortalDrop(portal, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Be sure to advance the command counter after the last script command */
|
||||||
|
CommandCounterIncrement();
|
||||||
|
}
|
|
@ -33,6 +33,7 @@
|
||||||
#include "commands/extension.h"
|
#include "commands/extension.h"
|
||||||
#include "libpq/pqsignal.h"
|
#include "libpq/pqsignal.h"
|
||||||
#include "catalog/namespace.h"
|
#include "catalog/namespace.h"
|
||||||
|
#include "distributed/background_jobs.h"
|
||||||
#include "distributed/citus_safe_lib.h"
|
#include "distributed/citus_safe_lib.h"
|
||||||
#include "distributed/distributed_deadlock_detection.h"
|
#include "distributed/distributed_deadlock_detection.h"
|
||||||
#include "distributed/maintenanced.h"
|
#include "distributed/maintenanced.h"
|
||||||
|
@ -97,7 +98,6 @@ double DistributedDeadlockDetectionTimeoutFactor = 2.0;
|
||||||
int Recover2PCInterval = 60000;
|
int Recover2PCInterval = 60000;
|
||||||
int DeferShardDeleteInterval = 15000;
|
int DeferShardDeleteInterval = 15000;
|
||||||
int RebalanceCheckInterval = 1000;
|
int RebalanceCheckInterval = 1000;
|
||||||
bool RebalanceJobDebugDelay = false;
|
|
||||||
|
|
||||||
/* config variables for metadata sync timeout */
|
/* config variables for metadata sync timeout */
|
||||||
int MetadataSyncInterval = 60000;
|
int MetadataSyncInterval = 60000;
|
||||||
|
@ -124,9 +124,6 @@ static void MaintenanceDaemonShmemExit(int code, Datum arg);
|
||||||
static void MaintenanceDaemonErrorContext(void *arg);
|
static void MaintenanceDaemonErrorContext(void *arg);
|
||||||
static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData);
|
static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData);
|
||||||
static void WarnMaintenanceDaemonNotStarted(void);
|
static void WarnMaintenanceDaemonNotStarted(void);
|
||||||
static BackgroundWorkerHandle * StartRebalanceJobsBackgroundWorker(Oid database,
|
|
||||||
Oid extensionOwner);
|
|
||||||
static bool ExecuteRebalanceJob(RebalanceJob *job);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* InitializeMaintenanceDaemon, called at server start, is responsible for
|
* InitializeMaintenanceDaemon, called at server start, is responsible for
|
||||||
|
@ -733,8 +730,8 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
"Starting background worker for execution.")));
|
"Starting background worker for execution.")));
|
||||||
|
|
||||||
rebalanceBgwHandle =
|
rebalanceBgwHandle =
|
||||||
StartRebalanceJobsBackgroundWorker(MyDatabaseId,
|
StartCitusBackgroundJobWorker(MyDatabaseId,
|
||||||
myDbData->userOid);
|
myDbData->userOid);
|
||||||
|
|
||||||
if (!rebalanceBgwHandle ||
|
if (!rebalanceBgwHandle ||
|
||||||
GetBackgroundWorkerPid(rebalanceBgwHandle, &rebalanceWorkerPid) ==
|
GetBackgroundWorkerPid(rebalanceBgwHandle, &rebalanceWorkerPid) ==
|
||||||
|
@ -800,203 +797,6 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static BackgroundWorkerHandle *
|
|
||||||
StartRebalanceJobsBackgroundWorker(Oid database, Oid extensionOwner)
|
|
||||||
{
|
|
||||||
BackgroundWorker worker;
|
|
||||||
BackgroundWorkerHandle *handle = NULL;
|
|
||||||
|
|
||||||
/* Configure a worker. */
|
|
||||||
memset(&worker, 0, sizeof(worker));
|
|
||||||
SafeSnprintf(worker.bgw_name, BGW_MAXLEN,
|
|
||||||
"Citus Rebalance Jobs Worker: %u/%u",
|
|
||||||
database, extensionOwner);
|
|
||||||
worker.bgw_flags =
|
|
||||||
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
|
|
||||||
worker.bgw_start_time = BgWorkerStart_ConsistentState;
|
|
||||||
|
|
||||||
/* don't restart, we manage restarts from maintenance daemon */
|
|
||||||
worker.bgw_restart_time = BGW_NEVER_RESTART;
|
|
||||||
strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
|
|
||||||
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
|
|
||||||
"RebalanceJobsBackgroundWorkerMain");
|
|
||||||
worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId);
|
|
||||||
memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner,
|
|
||||||
sizeof(Oid));
|
|
||||||
worker.bgw_notify_pid = MyProcPid;
|
|
||||||
|
|
||||||
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
|
|
||||||
{
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pid_t pid;
|
|
||||||
WaitForBackgroundWorkerStartup(handle, &pid);
|
|
||||||
|
|
||||||
return handle;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void
|
|
||||||
RebalanceJobsBackgroundWorkerMain(Datum arg)
|
|
||||||
{
|
|
||||||
Oid databaseOid = DatumGetObjectId(arg);
|
|
||||||
|
|
||||||
/* extension owner is passed via bgw_extra */
|
|
||||||
Oid extensionOwner = InvalidOid;
|
|
||||||
memcpy_s(&extensionOwner, sizeof(extensionOwner),
|
|
||||||
MyBgworkerEntry->bgw_extra, sizeof(Oid));
|
|
||||||
|
|
||||||
BackgroundWorkerUnblockSignals();
|
|
||||||
|
|
||||||
/* connect to database, after that we can actually access catalogs */
|
|
||||||
BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0);
|
|
||||||
|
|
||||||
/* make worker recognizable in pg_stat_activity */
|
|
||||||
pgstat_report_appname("rebalance jobs worker");
|
|
||||||
|
|
||||||
ereport(LOG, (errmsg("background jobs runner")));
|
|
||||||
|
|
||||||
if (RebalanceJobDebugDelay)
|
|
||||||
{
|
|
||||||
pg_usleep(30 * 1000 * 1000);
|
|
||||||
}
|
|
||||||
|
|
||||||
MemoryContext perJobContext = AllocSetContextCreateExtended(CurrentMemoryContext,
|
|
||||||
"PerJobContext",
|
|
||||||
ALLOCSET_DEFAULT_MINSIZE,
|
|
||||||
ALLOCSET_DEFAULT_INITSIZE,
|
|
||||||
ALLOCSET_DEFAULT_MAXSIZE);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* First we find all jobs that are running, we need to check if they are still running
|
|
||||||
* if not reset their state back to scheduled.
|
|
||||||
*/
|
|
||||||
{
|
|
||||||
StartTransactionCommand();
|
|
||||||
PushActiveSnapshot(GetTransactionSnapshot());
|
|
||||||
|
|
||||||
/* TODO have an actual function to check if the worker is still running */
|
|
||||||
ResetRunningJobs();
|
|
||||||
|
|
||||||
PopActiveSnapshot();
|
|
||||||
CommitTransactionCommand();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
MemoryContext oldContextPerJob = MemoryContextSwitchTo(perJobContext);
|
|
||||||
bool hasJobs = true;
|
|
||||||
while (hasJobs)
|
|
||||||
{
|
|
||||||
MemoryContextReset(perJobContext);
|
|
||||||
|
|
||||||
CHECK_FOR_INTERRUPTS();
|
|
||||||
|
|
||||||
InvalidateMetadataSystemCache();
|
|
||||||
StartTransactionCommand();
|
|
||||||
|
|
||||||
PushActiveSnapshot(GetTransactionSnapshot());
|
|
||||||
|
|
||||||
if (!LockCitusExtension())
|
|
||||||
{
|
|
||||||
ereport(DEBUG1, (errmsg("could not lock the citus extension, "
|
|
||||||
"skipping metadata sync")));
|
|
||||||
}
|
|
||||||
else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* We need to load the job into the perJobContext as we will switch contexts
|
|
||||||
* later due to the committing and starting of new transactions
|
|
||||||
*/
|
|
||||||
MemoryContext oldContext = MemoryContextSwitchTo(perJobContext);
|
|
||||||
RebalanceJob *job = GetRunableRebalanceJob();
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
|
||||||
|
|
||||||
if (!job)
|
|
||||||
{
|
|
||||||
PopActiveSnapshot();
|
|
||||||
CommitTransactionCommand();
|
|
||||||
|
|
||||||
hasJobs = false;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
ereport(LOG, (errmsg("found job with jobid: %ld", job->jobid)));
|
|
||||||
|
|
||||||
/* Update job status to indicate it is running */
|
|
||||||
UpdateJobStatus(job, REBALANCE_JOB_STATUS_RUNNING);
|
|
||||||
|
|
||||||
PopActiveSnapshot();
|
|
||||||
CommitTransactionCommand();
|
|
||||||
|
|
||||||
MemoryContext savedContext = CurrentMemoryContext;
|
|
||||||
PG_TRY();
|
|
||||||
{
|
|
||||||
StartTransactionCommand();
|
|
||||||
PushActiveSnapshot(GetTransactionSnapshot());
|
|
||||||
if (ExecuteRebalanceJob(job))
|
|
||||||
{
|
|
||||||
UpdateJobStatus(job, REBALANCE_JOB_STATUS_DONE);
|
|
||||||
|
|
||||||
PopActiveSnapshot();
|
|
||||||
CommitTransactionCommand();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
PG_CATCH();
|
|
||||||
{
|
|
||||||
MemoryContextSwitchTo(savedContext);
|
|
||||||
|
|
||||||
ErrorData *edata = CopyErrorData();
|
|
||||||
FlushErrorState();
|
|
||||||
|
|
||||||
StartTransactionCommand();
|
|
||||||
PushActiveSnapshot(GetTransactionSnapshot());
|
|
||||||
|
|
||||||
UpdateJobError(job, edata);
|
|
||||||
|
|
||||||
PopActiveSnapshot();
|
|
||||||
CommitTransactionCommand();
|
|
||||||
|
|
||||||
FreeErrorData(edata);
|
|
||||||
edata = NULL;
|
|
||||||
|
|
||||||
/* TODO log that there was an error */
|
|
||||||
}
|
|
||||||
PG_END_TRY();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContextPerJob);
|
|
||||||
MemoryContextDelete(perJobContext);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static bool
|
|
||||||
ExecuteRebalanceJob(RebalanceJob *job)
|
|
||||||
{
|
|
||||||
int spiResult = SPI_connect();
|
|
||||||
if (spiResult != SPI_OK_CONNECT)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("could not connect to SPI manager")));
|
|
||||||
}
|
|
||||||
|
|
||||||
spiResult = SPI_execute(job->command, false, 0);
|
|
||||||
|
|
||||||
/* if (spiResult != SPIOK) */
|
|
||||||
/* { */
|
|
||||||
/* ereport(ERROR, (errmsg("could not run SPI query"))); */
|
|
||||||
/* } */
|
|
||||||
|
|
||||||
spiResult = SPI_finish();
|
|
||||||
if (spiResult != SPI_OK_FINISH)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("could not finish SPI connection")));
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* MaintenanceDaemonShmemSize computes how much shared memory is required.
|
* MaintenanceDaemonShmemSize computes how much shared memory is required.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -0,0 +1,15 @@
|
||||||
|
#ifndef CITUS_BACKGROUND_JOBS_H
|
||||||
|
#define CITUS_BACKGROUND_JOBS_H
|
||||||
|
|
||||||
|
#import "postgres.h"
|
||||||
|
|
||||||
|
#import "postmaster/bgworker.h"
|
||||||
|
|
||||||
|
extern BackgroundWorkerHandle * StartCitusBackgroundJobWorker(Oid database, Oid
|
||||||
|
extensionOwner);
|
||||||
|
extern void CitusBackgroundJobMain(Datum arg);
|
||||||
|
extern void CitusBackgroundJobExecuter(Datum main_arg);
|
||||||
|
|
||||||
|
extern bool RebalanceJobDebugDelay;
|
||||||
|
|
||||||
|
#endif /*CITUS_BACKGROUND_JOBS_H */
|
|
@ -30,6 +30,5 @@ extern void InitializeMaintenanceDaemonBackend(void);
|
||||||
extern bool LockCitusExtension(void);
|
extern bool LockCitusExtension(void);
|
||||||
|
|
||||||
extern void CitusMaintenanceDaemonMain(Datum main_arg);
|
extern void CitusMaintenanceDaemonMain(Datum main_arg);
|
||||||
extern void RebalanceJobsBackgroundWorkerMain(Datum arg);
|
|
||||||
|
|
||||||
#endif /* MAINTENANCED_H */
|
#endif /* MAINTENANCED_H */
|
||||||
|
|
|
@ -338,7 +338,8 @@ extern bool JobHasUmnetDependencies(int64 jobid);
|
||||||
extern RebalanceJob * GetRunableRebalanceJob(void);
|
extern RebalanceJob * GetRunableRebalanceJob(void);
|
||||||
extern void ResetRunningJobs(void);
|
extern void ResetRunningJobs(void);
|
||||||
extern RebalanceJob * GetScheduledRebalanceJobByJobID(int64 jobId);
|
extern RebalanceJob * GetScheduledRebalanceJobByJobID(int64 jobId);
|
||||||
extern void UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus);
|
extern void UpdateJobStatus(int64 jobid, pid_t *pid, RebalanceJobStatus status,
|
||||||
|
int32 *retry_count, char *message);
|
||||||
extern bool UpdateJobError(RebalanceJob *job, ErrorData *edata);
|
extern bool UpdateJobError(RebalanceJob *job, ErrorData *edata);
|
||||||
extern void UnscheduleDependantJobs(int64 jobid);
|
extern void UnscheduleDependantJobs(int64 jobid);
|
||||||
extern bool IsRebalanceJobStatusTerminal(RebalanceJobStatus status);
|
extern bool IsRebalanceJobStatusTerminal(RebalanceJobStatus status);
|
||||||
|
|
|
@ -14,7 +14,6 @@
|
||||||
/* GUC to configure deferred shard deletion */
|
/* GUC to configure deferred shard deletion */
|
||||||
extern int DeferShardDeleteInterval;
|
extern int DeferShardDeleteInterval;
|
||||||
extern int RebalanceCheckInterval;
|
extern int RebalanceCheckInterval;
|
||||||
extern bool RebalanceJobDebugDelay;
|
|
||||||
extern bool DeferShardDeleteOnMove;
|
extern bool DeferShardDeleteOnMove;
|
||||||
extern double DesiredPercentFreeAfterMove;
|
extern double DesiredPercentFreeAfterMove;
|
||||||
extern bool CheckAvailableSpaceBeforeMove;
|
extern bool CheckAvailableSpaceBeforeMove;
|
||||||
|
|
Loading…
Reference in New Issue