run task based on sql command, log messages and simple retry

background-job-details
Nils Dijk 2022-07-08 18:04:36 +02:00 committed by Jelte Fennema
parent 3365771162
commit 12b0063c31
8 changed files with 196 additions and 139 deletions

View File

@ -1991,10 +1991,11 @@ ExecuteQueryViaSPI(char *query, int SPIOK)
}
spiResult = SPI_execute(query, false, 0);
if (spiResult != SPIOK)
{
ereport(ERROR, (errmsg("could not run SPI query")));
}
/* if (spiResult != SPIOK) */
/* { */
/* ereport(ERROR, (errmsg("could not run SPI query"))); */
/* } */
spiResult = SPI_finish();
if (spiResult != SPI_OK_FINISH)

View File

@ -146,6 +146,7 @@ typedef struct MetadataCacheData
Oid distRebalanceJobsStatusJobsIndexId;
Oid jobStatusScheduledId;
Oid jobStatusDoneId;
Oid jobStatusErrorId;
Oid distRebalanceStrategyRelationId;
Oid distNodeRelationId;
Oid distNodeNodeIdIndexId;
@ -3147,6 +3148,19 @@ JobStatusDoneId(void)
}
Oid
JobStatusErrorId(void)
{
if (!MetadataCache.jobStatusErrorId)
{
MetadataCache.jobStatusErrorId =
LookupStringEnumValueId("citus_job_status", "error");
}
return MetadataCache.jobStatusErrorId;
}
/*
* citus_dist_partition_cache_invalidate is a trigger function that performs
* relcache invalidations when the contents of pg_dist_partition are changed

View File

@ -2292,66 +2292,6 @@ RebalanceJobStatusOid(RebalanceJobStatus status)
}
static void
ParseMoveJob(RebalanceJob *target, Datum moveArgs)
{
HeapTupleHeader t = DatumGetHeapTupleHeader(moveArgs);
bool isnull = false;
Datum shardIdDatum = GetAttributeByName(t, "shard_id", &isnull);
if (isnull)
{
ereport(ERROR, (errmsg(
"shard_id for citus_move_shard_placement_arguments "
"can't be null")));
}
uint64 shardId = DatumGetUInt64(shardIdDatum);
Datum sourceNodeNameDatum = GetAttributeByName(t, "source_node_name", &isnull);
if (isnull)
{
ereport(ERROR, (errmsg(
"source_node_name for citus_move_shard_placement_arguments "
"can't be null")));
}
text *sourceNodeNameText = DatumGetTextP(sourceNodeNameDatum);
Datum sourceNodePortDatum = GetAttributeByName(t, "source_node_port", &isnull);
if (isnull)
{
ereport(ERROR, (errmsg(
"source_node_port for citus_move_shard_placement_arguments "
"can't be null")));
}
int32 sourceNodePort = DatumGetInt32(sourceNodePortDatum);
Datum targetNodeNameDatum = GetAttributeByName(t, "target_node_name", &isnull);
if (isnull)
{
ereport(ERROR, (errmsg(
"target_node_name for citus_move_shard_placement_arguments "
"can't be null")));
}
text *targetNodeNameText = DatumGetTextP(targetNodeNameDatum);
Datum targetNodePortDatum = GetAttributeByName(t, "target_node_port", &isnull);
if (isnull)
{
ereport(ERROR, (errmsg(
"target_node_port for citus_move_shard_placement_arguments "
"can't be null")));
}
int32 targetNodePort = DatumGetInt32(targetNodePortDatum);
target->jobType = REBALANCE_JOB_TYPE_MOVE;
target->jobArguments.move.shardId = shardId;
target->jobArguments.move.sourceName = text_to_cstring(sourceNodeNameText);
target->jobArguments.move.sourcePort = sourceNodePort;
target->jobArguments.move.targetName = text_to_cstring(targetNodeNameText);
target->jobArguments.move.targetPort = targetNodePort;
}
RebalanceJob *
GetScheduledRebalanceJob(void)
{
@ -2387,17 +2327,8 @@ GetScheduledRebalanceJob(void)
TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs);
heap_deform_tuple(jobTuple, tupleDescriptor, datumArray, isNullArray);
if (!isNullArray[Anum_pg_dist_rebalance_jobs_citus_move_shard_placement - 1])
{
/* citus_move_shard_placement job */
ParseMoveJob(
job,
datumArray[Anum_pg_dist_rebalance_jobs_citus_move_shard_placement - 1]);
}
else
{
ereport(ERROR, (errmsg("undefined job type")));
}
job->command = text_to_cstring(
DatumGetTextP(datumArray[Anum_pg_dist_rebalance_jobs_command - 1]));
}
systable_endscan(scanDescriptor);
@ -2452,3 +2383,114 @@ UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus)
systable_endscan(scanDescriptor);
table_close(pgDistRebalanceJobs, NoLock);
}
void
UpdateJobError(RebalanceJob *job, ErrorData *edata)
{
Relation pgDistRebalanceJobs = table_open(DistRebalanceJobsRelationId(),
RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistRebalanceJobs);
ScanKeyData scanKey[1];
int scanKeyCount = 1;
/* WHERE jobid = job->jobid */
ScanKeyInit(&scanKey[0], Anum_pg_dist_rebalance_jobs_jobid,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(job->jobid));
const bool indexOK = true;
SysScanDesc scanDescriptor = systable_beginscan(pgDistRebalanceJobs,
DistRebalanceJobsJobsIdIndexId(),
indexOK,
NULL, scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find rebalance job entry for jobid: "
UINT64_FORMAT, job->jobid)));
}
Datum values[Natts_pg_dist_rebalance_jobs] = { 0 };
bool isnull[Natts_pg_dist_rebalance_jobs] = { 0 };
bool replace[Natts_pg_dist_rebalance_jobs] = { 0 };
heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull);
/* increment retry count */
int retryCount = 0;
if (!isnull[Anum_pg_dist_rebalance_jobs_retry_count - 1])
{
retryCount = DatumGetInt32(values[Anum_pg_dist_rebalance_jobs_retry_count - 1]);
retryCount++;
}
values[Anum_pg_dist_rebalance_jobs_retry_count - 1] = Int32GetDatum(retryCount);
isnull[Anum_pg_dist_rebalance_jobs_retry_count - 1] = false;
replace[Anum_pg_dist_rebalance_jobs_retry_count - 1] = true;
if (retryCount >= 3)
{
/* after 3 failures we will transition the job to error and stop executing */
values[Anum_pg_dist_rebalance_jobs_status - 1] =
ObjectIdGetDatum(JobStatusErrorId());
isnull[Anum_pg_dist_rebalance_jobs_status - 1] = false;
replace[Anum_pg_dist_rebalance_jobs_status - 1] = true;
}
StringInfoData buf = { 0 };
initStringInfo(&buf);
if (edata->message)
{
if (buf.len > 0)
{
appendStringInfo(&buf, "\n");
}
appendStringInfoString(&buf, "ERROR: ");
appendStringInfoString(&buf, edata->message);
}
if (edata->hint)
{
if (buf.len > 0)
{
appendStringInfo(&buf, "\n");
}
appendStringInfoString(&buf, "HINT: ");
appendStringInfoString(&buf, edata->hint);
}
if (edata->detail)
{
if (buf.len > 0)
{
appendStringInfo(&buf, "\n");
}
appendStringInfoString(&buf, "DETAIL: ");
appendStringInfoString(&buf, edata->detail);
}
if (edata->context)
{
if (buf.len > 0)
{
appendStringInfo(&buf, "\n");
}
appendStringInfoString(&buf, "CONTEXT: ");
appendStringInfoString(&buf, edata->context);
}
values[Anum_pg_dist_rebalance_jobs_message - 1] = CStringGetTextDatum(buf.data);
isnull[Anum_pg_dist_rebalance_jobs_message - 1] = false;
replace[Anum_pg_dist_rebalance_jobs_message - 1] = true;
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
CatalogTupleUpdate(pgDistRebalanceJobs, &heapTuple->t_self, heapTuple);
CommandCounterIncrement();
systable_endscan(scanDescriptor);
table_close(pgDistRebalanceJobs, NoLock);
}

View File

@ -77,25 +77,17 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_
DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text);
#include "udfs/isolate_tenant_to_new_shard/11.1-1.sql"
CREATE TYPE citus.citus_move_shard_placement_arguments AS (
shard_id bigint,
source_node_name text,
source_node_port integer,
target_node_name text,
target_node_port integer,
shard_transfer_mode citus.shard_transfer_mode
);
ALTER TYPE citus.citus_move_shard_placement_arguments SET SCHEMA pg_catalog;
CREATE TYPE citus.citus_job_status AS ENUM ('scheduled', 'done');
CREATE TYPE citus.citus_job_status AS ENUM ('scheduled', 'done', 'error');
ALTER TYPE citus.citus_job_status SET SCHEMA pg_catalog;
CREATE TABLE citus.pg_dist_rebalance_jobs(
jobid bigserial NOT NULL,
status pg_catalog.citus_job_status default 'scheduled',
citus_move_shard_placement pg_catalog.citus_move_shard_placement_arguments
status pg_catalog.citus_job_status default 'scheduled' NOT NULL,
command text NOT NULL,
retry_count integer,
message text
);
-- SELECT granted to PUBLIC in upgrade script
ALTER TABLE citus.pg_dist_rebalance_jobs SET SCHEMA pg_catalog;
CREATE UNIQUE INDEX pg_dist_rebalance_jobs_jobid_index ON pg_catalog.pg_dist_rebalance_jobs using btree(jobid);
CREATE INDEX pg_dist_rebalance_jobs_status_jobid_index ON pg_catalog.pg_dist_rebalance_jobs using btree(status, jobid);

View File

@ -19,6 +19,7 @@
#include "distributed/pg_version_constants.h"
#include <time.h>
#include <executor/spi.h>
#include "miscadmin.h"
#include "pgstat.h"
@ -855,6 +856,8 @@ RebalanceJobsBackgroundWorkerMain(Datum arg)
ereport(LOG, (errmsg("background jobs runner")));
/* pg_usleep(30 * 1000 * 1000); */
bool hasJobs = true;
while (hasJobs)
{
@ -876,11 +879,35 @@ RebalanceJobsBackgroundWorkerMain(Datum arg)
if (job)
{
ereport(LOG, (errmsg("found job with jobid: %ld", job->jobid)));
MemoryContext savedContext = CurrentMemoryContext;
BeginInternalSubTransaction(NULL);
if (ExecuteRebalanceJob(job))
PG_TRY();
{
UpdateJobStatus(job, REBALANCE_JOB_STATUS_DONE);
if (ExecuteRebalanceJob(job))
{
UpdateJobStatus(job, REBALANCE_JOB_STATUS_DONE);
}
ReleaseCurrentSubTransaction();
}
PG_CATCH();
{
MemoryContextSwitchTo(savedContext);
ErrorData *edata = CopyErrorData();
FlushErrorState();
RollbackAndReleaseCurrentSubTransaction();
UpdateJobError(job, edata);
FreeErrorData(edata);
edata = NULL;
/* TODO log that there was an error */
}
PG_END_TRY();
}
else
{
@ -895,37 +922,29 @@ RebalanceJobsBackgroundWorkerMain(Datum arg)
}
static bool
ExecuteMoveRebalanceJob(RebalanceJob *job)
{
DirectFunctionCall6Coll(citus_move_shard_placement, InvalidOid,
Int64GetDatum(job->jobArguments.move.shardId),
CStringGetTextDatum(job->jobArguments.move.sourceName),
Int32GetDatum(job->jobArguments.move.sourcePort),
CStringGetTextDatum(job->jobArguments.move.targetName),
Int32GetDatum(job->jobArguments.move.targetPort),
ObjectIdGetDatum(16598)); /* fix hardcoded value of 'block_writes' */
return true;
}
static bool
ExecuteRebalanceJob(RebalanceJob *job)
{
switch (job->jobType)
int spiResult = SPI_connect();
if (spiResult != SPI_OK_CONNECT)
{
case REBALANCE_JOB_TYPE_MOVE:
{
return ExecuteMoveRebalanceJob(job);
}
default:
{
ereport(ERROR, (errmsg("undefined rebalance job for jobid: %ld",
job->jobid)));
}
ereport(ERROR, (errmsg("could not connect to SPI manager")));
}
spiResult = SPI_execute(job->command, false, 0);
/* if (spiResult != SPIOK) */
/* { */
/* ereport(ERROR, (errmsg("could not run SPI query"))); */
/* } */
spiResult = SPI_finish();
if (spiResult != SPI_OK_FINISH)
{
ereport(ERROR, (errmsg("could not finish SPI connection")));
}
return true;
}

View File

@ -271,8 +271,9 @@ extern Oid SecondaryNodeRoleId(void);
extern Oid CitusCopyFormatTypeId(void);
extern Oid TextCopyFormatId(void);
extern Oid BinaryCopyFormatId(void);
extern Oid JobStatusDoneId(void);
extern Oid JobStatusScheduledId(void);
extern Oid JobStatusDoneId(void);
extern Oid JobStatusErrorId(void);
/* user related functions */
extern Oid CitusExtensionOwner(void);

View File

@ -211,29 +211,12 @@ typedef enum RebalanceJobStatus
REBALANCE_JOB_STATUS_DONE
} RebalanceJobStatus;
typedef enum RebalanceJobType
{
REBALANCE_JOB_TYPE_UNKNOWN,
REBALANCE_JOB_TYPE_MOVE
} RebalanceJobType;
typedef struct RebalanceJob
{
int64 jobid;
RebalanceJobStatus status;
RebalanceJobType jobType;
union
{
struct
{
uint32 shardId;
char *sourceName;
int32 sourcePort;
char *targetName;
int32 targetPort;
} move;
} jobArguments;
char *command;
} RebalanceJob;
@ -347,4 +330,5 @@ extern void EnsureRelationHasCompatibleSequenceTypes(Oid relationId);
extern bool HasScheduledRebalanceJobs(void);
extern RebalanceJob * GetScheduledRebalanceJob(void);
extern void UpdateJobStatus(RebalanceJob *job, RebalanceJobStatus newStatus);
extern void UpdateJobError(RebalanceJob *job, ErrorData *edata);
#endif /* METADATA_UTILITY_H */

View File

@ -11,7 +11,9 @@ typedef struct FormData_pg_dist_rebalance_job
int64 jobid;
Oid status;
#ifdef CATALOG_VARLEN /* variable-length fields start here */
text citus_move_shard_placement; /* text? we need to understand how to read a variable length stored custon type */
text command;
int32 retry_count;
text message;
#endif
} FormData_pg_dist_rebalance_job;
@ -26,9 +28,11 @@ typedef FormData_pg_dist_rebalance_job *Form_pg_dist_rebalance_job;
* compiler constants for pg_dist_rebalance_jobs
* ----------------
*/
#define Natts_pg_dist_rebalance_jobs 3
#define Natts_pg_dist_rebalance_jobs 5
#define Anum_pg_dist_rebalance_jobs_jobid 1
#define Anum_pg_dist_rebalance_jobs_status 2
#define Anum_pg_dist_rebalance_jobs_citus_move_shard_placement 3
#define Anum_pg_dist_rebalance_jobs_command 3
#define Anum_pg_dist_rebalance_jobs_retry_count 4
#define Anum_pg_dist_rebalance_jobs_message 5
#endif /* CITUS_PG_DIST_REBALANCE_JOBS_H */