mirror of https://github.com/citusdata/citus.git
* add SIGTERM handler to gracefully terminate task executors, \ (#6473)
Adds signal handlers for graceful termination, cancellation of task executors and detecting config updates. Related to PR #6459. #### How to handle termination signal? Monitor need to gracefully terminate all running task executors before terminating. Hence, we have sigterm handler for the monitor. #### How to handle cancellation signal? Monitor need to gracefully cancel all running task executors before terminating. Hence, we have sigint handler for the monitor. #### How to detect configuration changes? Monitor has SIGHUP handler to reflect configuration changes while executing tasks.pull/6508/head
parent
6781ace3a1
commit
65f256eec4
|
@ -72,12 +72,14 @@
|
||||||
#define CITUS_BACKGROUND_TASK_KEY_COMMAND 2
|
#define CITUS_BACKGROUND_TASK_KEY_COMMAND 2
|
||||||
#define CITUS_BACKGROUND_TASK_KEY_QUEUE 3
|
#define CITUS_BACKGROUND_TASK_KEY_QUEUE 3
|
||||||
#define CITUS_BACKGROUND_TASK_KEY_TASK_ID 4
|
#define CITUS_BACKGROUND_TASK_KEY_TASK_ID 4
|
||||||
#define CITUS_BACKGROUND_TASK_NKEYS 5
|
#define CITUS_BACKGROUND_TASK_KEY_JOB_ID 5
|
||||||
|
#define CITUS_BACKGROUND_TASK_NKEYS 6
|
||||||
|
|
||||||
static BackgroundWorkerHandle * StartCitusBackgroundTaskExecutor(char *database,
|
static BackgroundWorkerHandle * StartCitusBackgroundTaskExecutor(char *database,
|
||||||
char *user,
|
char *user,
|
||||||
char *command,
|
char *command,
|
||||||
int64 taskId,
|
int64 taskId,
|
||||||
|
int64 jobId,
|
||||||
dsm_segment **pSegment);
|
dsm_segment **pSegment);
|
||||||
static void ExecuteSqlString(const char *sql);
|
static void ExecuteSqlString(const char *sql);
|
||||||
static shm_mq_result ConsumeTaskWorkerOutput(shm_mq_handle *responseq, StringInfo message,
|
static shm_mq_result ConsumeTaskWorkerOutput(shm_mq_handle *responseq, StringInfo message,
|
||||||
|
@ -106,6 +108,18 @@ static TaskExecutionStatus ConsumeExecutorQueue(
|
||||||
TaskExecutionContext *taskExecutionContext);
|
TaskExecutionContext *taskExecutionContext);
|
||||||
static void TaskHadError(TaskExecutionContext *taskExecutionContext);
|
static void TaskHadError(TaskExecutionContext *taskExecutionContext);
|
||||||
static void TaskEnded(TaskExecutionContext *taskExecutionContext);
|
static void TaskEnded(TaskExecutionContext *taskExecutionContext);
|
||||||
|
static void TerminateAllTaskExecutors(HTAB *currentExecutors);
|
||||||
|
static HTAB * GetRunningUniqueJobIds(HTAB *currentExecutors);
|
||||||
|
static void CancelAllTaskExecutors(HTAB *currentExecutors);
|
||||||
|
static bool MonitorGotTerminationOrCancellationRequest();
|
||||||
|
static void QueueMonitorSigTermHandler(SIGNAL_ARGS);
|
||||||
|
static void QueueMonitorSigIntHandler(SIGNAL_ARGS);
|
||||||
|
static void QueueMonitorSigHupHandler(SIGNAL_ARGS);
|
||||||
|
|
||||||
|
/* flags set by signal handlers */
|
||||||
|
static volatile sig_atomic_t GotSigterm = false;
|
||||||
|
static volatile sig_atomic_t GotSigint = false;
|
||||||
|
static volatile sig_atomic_t GotSighup = false;
|
||||||
|
|
||||||
PG_FUNCTION_INFO_V1(citus_job_cancel);
|
PG_FUNCTION_INFO_V1(citus_job_cancel);
|
||||||
PG_FUNCTION_INFO_V1(citus_job_wait);
|
PG_FUNCTION_INFO_V1(citus_job_wait);
|
||||||
|
@ -337,8 +351,6 @@ CitusBackgroundTaskQueueMonitorErrorCallback(void *arg)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* NewExecutorExceedsCitusLimit returns true if currently we reached Citus' max worker count.
|
* NewExecutorExceedsCitusLimit returns true if currently we reached Citus' max worker count.
|
||||||
* It also sleeps 1 sec to let running tasks progress so that we have better chance to not hit
|
|
||||||
* that limit again.
|
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
NewExecutorExceedsCitusLimit(QueueMonitorExecutionContext *queueMonitorExecutionContext)
|
NewExecutorExceedsCitusLimit(QueueMonitorExecutionContext *queueMonitorExecutionContext)
|
||||||
|
@ -372,8 +384,6 @@ NewExecutorExceedsCitusLimit(QueueMonitorExecutionContext *queueMonitorExecution
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* NewExecutorExceedsPgMaxWorkers returns true if currently we reached Postgres' max worker count.
|
* NewExecutorExceedsPgMaxWorkers returns true if currently we reached Postgres' max worker count.
|
||||||
* It also sleeps 1 sec to let running tasks progress so that we have better chance to not hit
|
|
||||||
* that limit again.
|
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
NewExecutorExceedsPgMaxWorkers(BackgroundWorkerHandle *handle,
|
NewExecutorExceedsPgMaxWorkers(BackgroundWorkerHandle *handle,
|
||||||
|
@ -389,9 +399,8 @@ NewExecutorExceedsPgMaxWorkers(BackgroundWorkerHandle *handle,
|
||||||
*/
|
*/
|
||||||
if (queueMonitorExecutionContext->backgroundWorkerFailedStartTime == 0)
|
if (queueMonitorExecutionContext->backgroundWorkerFailedStartTime == 0)
|
||||||
{
|
{
|
||||||
ereport(WARNING, (errmsg(
|
ereport(WARNING, (errmsg("unable to start background worker for "
|
||||||
"unable to start background worker for "
|
"background task execution"),
|
||||||
"background task execution"),
|
|
||||||
errdetail(
|
errdetail(
|
||||||
"Current number of task "
|
"Current number of task "
|
||||||
"executors: %ld/%d",
|
"executors: %ld/%d",
|
||||||
|
@ -432,7 +441,7 @@ AssignRunnableTaskToNewExecutor(BackgroundTask *runnableTask,
|
||||||
dsm_segment *seg = NULL;
|
dsm_segment *seg = NULL;
|
||||||
BackgroundWorkerHandle *handle =
|
BackgroundWorkerHandle *handle =
|
||||||
StartCitusBackgroundTaskExecutor(databaseName, userName, runnableTask->command,
|
StartCitusBackgroundTaskExecutor(databaseName, userName, runnableTask->command,
|
||||||
runnableTask->taskid, &seg);
|
runnableTask->taskid, runnableTask->jobid, &seg);
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
|
||||||
if (NewExecutorExceedsPgMaxWorkers(handle, queueMonitorExecutionContext))
|
if (NewExecutorExceedsPgMaxWorkers(handle, queueMonitorExecutionContext))
|
||||||
|
@ -450,6 +459,7 @@ AssignRunnableTaskToNewExecutor(BackgroundTask *runnableTask,
|
||||||
Assert(!handleEntryFound);
|
Assert(!handleEntryFound);
|
||||||
handleEntry->handle = handle;
|
handleEntry->handle = handle;
|
||||||
handleEntry->seg = seg;
|
handleEntry->seg = seg;
|
||||||
|
handleEntry->jobid = runnableTask->jobid;
|
||||||
|
|
||||||
/* reset worker allocation timestamp and log time elapsed since the last failure */
|
/* reset worker allocation timestamp and log time elapsed since the last failure */
|
||||||
CheckAndResetLastWorkerAllocationFailure(queueMonitorExecutionContext);
|
CheckAndResetLastWorkerAllocationFailure(queueMonitorExecutionContext);
|
||||||
|
@ -741,6 +751,138 @@ TaskEnded(TaskExecutionContext *taskExecutionContext)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* QueueMonitorSigHupHandler handles SIGHUP to update monitor related config params.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
QueueMonitorSigHupHandler(SIGNAL_ARGS)
|
||||||
|
{
|
||||||
|
int saved_errno = errno;
|
||||||
|
|
||||||
|
GotSighup = true;
|
||||||
|
|
||||||
|
if (MyProc)
|
||||||
|
{
|
||||||
|
SetLatch(&MyProc->procLatch);
|
||||||
|
}
|
||||||
|
|
||||||
|
errno = saved_errno;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* MonitorGotTerminationOrCancellationRequest returns true if monitor had SIGTERM or SIGINT signals
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
MonitorGotTerminationOrCancellationRequest()
|
||||||
|
{
|
||||||
|
return GotSigterm || GotSigint;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* QueueMonitorSigTermHandler handles SIGTERM by setting a flag to inform the monitor process
|
||||||
|
* so that it can terminate active task executors properly. It also sets the latch to awake the
|
||||||
|
* monitor if it waits on it.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
QueueMonitorSigTermHandler(SIGNAL_ARGS)
|
||||||
|
{
|
||||||
|
int saved_errno = errno;
|
||||||
|
|
||||||
|
GotSigterm = true;
|
||||||
|
|
||||||
|
if (MyProc)
|
||||||
|
{
|
||||||
|
SetLatch(&MyProc->procLatch);
|
||||||
|
}
|
||||||
|
|
||||||
|
errno = saved_errno;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* QueueMonitorSigIntHandler handles SIGINT by setting a flag to inform the monitor process
|
||||||
|
* so that it can terminate active task executors properly. It also sets the latch to awake the
|
||||||
|
* monitor if it waits on it.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
QueueMonitorSigIntHandler(SIGNAL_ARGS)
|
||||||
|
{
|
||||||
|
int saved_errno = errno;
|
||||||
|
|
||||||
|
GotSigint = true;
|
||||||
|
|
||||||
|
if (MyProc)
|
||||||
|
{
|
||||||
|
SetLatch(&MyProc->procLatch);
|
||||||
|
}
|
||||||
|
|
||||||
|
errno = saved_errno;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TerminateAllTaskExecutors terminates task executors given in the hash map.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
TerminateAllTaskExecutors(HTAB *currentExecutors)
|
||||||
|
{
|
||||||
|
HASH_SEQ_STATUS status;
|
||||||
|
BackgroundExecutorHashEntry *backgroundExecutorHashEntry;
|
||||||
|
foreach_htab(backgroundExecutorHashEntry, &status, currentExecutors)
|
||||||
|
{
|
||||||
|
TerminateBackgroundWorker(backgroundExecutorHashEntry->handle);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GetRunningUniqueJobIds returns unique job ids from currentExecutors
|
||||||
|
*/
|
||||||
|
static HTAB *
|
||||||
|
GetRunningUniqueJobIds(HTAB *currentExecutors)
|
||||||
|
{
|
||||||
|
/* create a set to store unique job ids for currently executing tasks */
|
||||||
|
HTAB *uniqueJobIds = CreateSimpleHashSetWithSize(int64, MAX_BG_TASK_EXECUTORS);
|
||||||
|
|
||||||
|
HASH_SEQ_STATUS status;
|
||||||
|
BackgroundExecutorHashEntry *backgroundExecutorHashEntry;
|
||||||
|
foreach_htab(backgroundExecutorHashEntry, &status, currentExecutors)
|
||||||
|
{
|
||||||
|
hash_search(uniqueJobIds, &backgroundExecutorHashEntry->jobid, HASH_ENTER, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
return uniqueJobIds;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CancelAllTaskExecutors cancels task executors given in the hash map.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
CancelAllTaskExecutors(HTAB *currentExecutors)
|
||||||
|
{
|
||||||
|
StartTransactionCommand();
|
||||||
|
PushActiveSnapshot(GetTransactionSnapshot());
|
||||||
|
|
||||||
|
/* get unique job id set for running tasks in currentExecutors */
|
||||||
|
HTAB *uniqueJobIds = GetRunningUniqueJobIds(currentExecutors);
|
||||||
|
|
||||||
|
HASH_SEQ_STATUS status;
|
||||||
|
int64 *uniqueJobId;
|
||||||
|
foreach_htab(uniqueJobId, &status, uniqueJobIds)
|
||||||
|
{
|
||||||
|
ereport(DEBUG1, (errmsg("cancelling job: %ld", *uniqueJobId)));
|
||||||
|
Datum jobidDatum = Int64GetDatum(*uniqueJobId);
|
||||||
|
DirectFunctionCall1(citus_job_cancel, jobidDatum);
|
||||||
|
}
|
||||||
|
|
||||||
|
PopActiveSnapshot();
|
||||||
|
CommitTransactionCommand();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CitusBackgroundTaskQueueMonitorMain is the main entry point for the background worker
|
* CitusBackgroundTaskQueueMonitorMain is the main entry point for the background worker
|
||||||
* running the background tasks queue monitor.
|
* running the background tasks queue monitor.
|
||||||
|
@ -758,6 +900,18 @@ TaskEnded(TaskExecutionContext *taskExecutionContext)
|
||||||
void
|
void
|
||||||
CitusBackgroundTaskQueueMonitorMain(Datum arg)
|
CitusBackgroundTaskQueueMonitorMain(Datum arg)
|
||||||
{
|
{
|
||||||
|
/* handle SIGTERM to properly terminate active task executors */
|
||||||
|
pqsignal(SIGTERM, QueueMonitorSigTermHandler);
|
||||||
|
|
||||||
|
/* handle SIGINT to properly cancel active task executors */
|
||||||
|
pqsignal(SIGINT, QueueMonitorSigIntHandler);
|
||||||
|
|
||||||
|
/* handle SIGHUP to update MaxBackgroundTaskExecutors */
|
||||||
|
pqsignal(SIGHUP, QueueMonitorSigHupHandler);
|
||||||
|
|
||||||
|
/* ready to handle signals */
|
||||||
|
BackgroundWorkerUnblockSignals();
|
||||||
|
|
||||||
Oid databaseOid = DatumGetObjectId(arg);
|
Oid databaseOid = DatumGetObjectId(arg);
|
||||||
|
|
||||||
/* extension owner is passed via bgw_extra */
|
/* extension owner is passed via bgw_extra */
|
||||||
|
@ -765,8 +919,6 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg)
|
||||||
memcpy_s(&extensionOwner, sizeof(extensionOwner),
|
memcpy_s(&extensionOwner, sizeof(extensionOwner),
|
||||||
MyBgworkerEntry->bgw_extra, sizeof(Oid));
|
MyBgworkerEntry->bgw_extra, sizeof(Oid));
|
||||||
|
|
||||||
BackgroundWorkerUnblockSignals();
|
|
||||||
|
|
||||||
/* connect to database, after that we can actually access catalogs */
|
/* connect to database, after that we can actually access catalogs */
|
||||||
BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0);
|
BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0);
|
||||||
|
|
||||||
|
@ -807,10 +959,6 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg)
|
||||||
* cause conflicts on processing the tasks in the catalog table as well as violate
|
* cause conflicts on processing the tasks in the catalog table as well as violate
|
||||||
* parallelism guarantees. To make sure there is at most, exactly one backend running
|
* parallelism guarantees. To make sure there is at most, exactly one backend running
|
||||||
* we take a session lock on the CITUS_BACKGROUND_TASK_MONITOR operation.
|
* we take a session lock on the CITUS_BACKGROUND_TASK_MONITOR operation.
|
||||||
*
|
|
||||||
* TODO now that we have a lock, we should install a term handler to terminate any
|
|
||||||
* 'child' backend when we are terminated. Otherwise we will still have a situation
|
|
||||||
* where the actual task could be running multiple times.
|
|
||||||
*/
|
*/
|
||||||
LOCKTAG tag = { 0 };
|
LOCKTAG tag = { 0 };
|
||||||
SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_BACKGROUND_TASK_MONITOR);
|
SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_BACKGROUND_TASK_MONITOR);
|
||||||
|
@ -841,11 +989,13 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg)
|
||||||
PopActiveSnapshot();
|
PopActiveSnapshot();
|
||||||
CommitTransactionCommand();
|
CommitTransactionCommand();
|
||||||
|
|
||||||
/* create a map to store parallel task executors */
|
/* create a map to store parallel task executors. Persist it in monitor memory context */
|
||||||
|
oldContext = MemoryContextSwitchTo(backgroundTaskContext);
|
||||||
HTAB *currentExecutors = CreateSimpleHashWithNameAndSize(int64,
|
HTAB *currentExecutors = CreateSimpleHashWithNameAndSize(int64,
|
||||||
BackgroundExecutorHashEntry,
|
BackgroundExecutorHashEntry,
|
||||||
"Background Executor Hash",
|
"Background Executor Hash",
|
||||||
MAX_BG_TASK_EXECUTORS);
|
MAX_BG_TASK_EXECUTORS);
|
||||||
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* monitor execution context that is useful during the monitor loop.
|
* monitor execution context that is useful during the monitor loop.
|
||||||
|
@ -861,6 +1011,10 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg)
|
||||||
.ctx = backgroundTaskContext
|
.ctx = backgroundTaskContext
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/* flag to prevent duplicate termination and cancellation of task executors */
|
||||||
|
bool terminateExecutorsStarted = false;
|
||||||
|
bool cancelExecutorsStarted = false;
|
||||||
|
|
||||||
/* loop exits if there is no running or runnable tasks left */
|
/* loop exits if there is no running or runnable tasks left */
|
||||||
bool hasAnyTask = true;
|
bool hasAnyTask = true;
|
||||||
while (hasAnyTask)
|
while (hasAnyTask)
|
||||||
|
@ -868,15 +1022,47 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg)
|
||||||
/* handle signals */
|
/* handle signals */
|
||||||
CHECK_FOR_INTERRUPTS();
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* if the flag is set, we should terminate all task executor workers to prevent duplicate
|
||||||
|
* runs of the same task on the next start of the monitor, which is dangerous for non-idempotent
|
||||||
|
* tasks. We do not break the loop here as we want to reflect tasks' messages. Hence, we wait until
|
||||||
|
* all tasks finish and also do not allow new runnable tasks to start running. After all current tasks
|
||||||
|
* finish, we can exit the loop safely.
|
||||||
|
*/
|
||||||
|
if (GotSigterm && !terminateExecutorsStarted)
|
||||||
|
{
|
||||||
|
ereport(LOG, (errmsg("handling termination signal")));
|
||||||
|
terminateExecutorsStarted = true;
|
||||||
|
TerminateAllTaskExecutors(queueMonitorExecutionContext.currentExecutors);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (GotSigint && !cancelExecutorsStarted)
|
||||||
|
{
|
||||||
|
ereport(LOG, (errmsg("handling cancellation signal")));
|
||||||
|
cancelExecutorsStarted = true;
|
||||||
|
CancelAllTaskExecutors(queueMonitorExecutionContext.currentExecutors);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (GotSighup)
|
||||||
|
{
|
||||||
|
GotSighup = false;
|
||||||
|
|
||||||
|
/* update max_background_task_executors if changed */
|
||||||
|
ProcessConfigFile(PGC_SIGHUP);
|
||||||
|
}
|
||||||
|
|
||||||
/* invalidate cache for new data in catalog */
|
/* invalidate cache for new data in catalog */
|
||||||
InvalidateMetadataSystemCache();
|
InvalidateMetadataSystemCache();
|
||||||
|
|
||||||
/* assign runnable tasks, if any, to new task executors in a transaction */
|
/* assign runnable tasks, if any, to new task executors in a transaction if we do not have SIGTERM or SIGINT */
|
||||||
StartTransactionCommand();
|
if (!MonitorGotTerminationOrCancellationRequest())
|
||||||
PushActiveSnapshot(GetTransactionSnapshot());
|
{
|
||||||
AssignRunnableTasks(&queueMonitorExecutionContext);
|
StartTransactionCommand();
|
||||||
PopActiveSnapshot();
|
PushActiveSnapshot(GetTransactionSnapshot());
|
||||||
CommitTransactionCommand();
|
AssignRunnableTasks(&queueMonitorExecutionContext);
|
||||||
|
PopActiveSnapshot();
|
||||||
|
CommitTransactionCommand();
|
||||||
|
}
|
||||||
|
|
||||||
/* get running task entries from hash table */
|
/* get running task entries from hash table */
|
||||||
List *runningTaskEntries = GetRunningTaskEntries(
|
List *runningTaskEntries = GetRunningTaskEntries(
|
||||||
|
@ -1268,7 +1454,8 @@ ConsumeTaskWorkerOutput(shm_mq_handle *responseq, StringInfo message, bool *hadE
|
||||||
* environment to the executor.
|
* environment to the executor.
|
||||||
*/
|
*/
|
||||||
static dsm_segment *
|
static dsm_segment *
|
||||||
StoreArgumentsInDSM(char *database, char *username, char *command, int64 taskId)
|
StoreArgumentsInDSM(char *database, char *username, char *command,
|
||||||
|
int64 taskId, int64 jobId)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Create the shared memory that we will pass to the background
|
* Create the shared memory that we will pass to the background
|
||||||
|
@ -1284,6 +1471,7 @@ StoreArgumentsInDSM(char *database, char *username, char *command, int64 taskId)
|
||||||
#define QUEUE_SIZE ((Size) 65536)
|
#define QUEUE_SIZE ((Size) 65536)
|
||||||
shm_toc_estimate_chunk(&e, QUEUE_SIZE);
|
shm_toc_estimate_chunk(&e, QUEUE_SIZE);
|
||||||
shm_toc_estimate_chunk(&e, sizeof(int64));
|
shm_toc_estimate_chunk(&e, sizeof(int64));
|
||||||
|
shm_toc_estimate_chunk(&e, sizeof(int64));
|
||||||
shm_toc_estimate_keys(&e, CITUS_BACKGROUND_TASK_NKEYS);
|
shm_toc_estimate_keys(&e, CITUS_BACKGROUND_TASK_NKEYS);
|
||||||
Size segsize = shm_toc_estimate(&e);
|
Size segsize = shm_toc_estimate(&e);
|
||||||
|
|
||||||
|
@ -1330,6 +1518,10 @@ StoreArgumentsInDSM(char *database, char *username, char *command, int64 taskId)
|
||||||
*taskIdTarget = taskId;
|
*taskIdTarget = taskId;
|
||||||
shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_TASK_ID, taskIdTarget);
|
shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_TASK_ID, taskIdTarget);
|
||||||
|
|
||||||
|
int64 *jobIdTarget = shm_toc_allocate(toc, sizeof(int64));
|
||||||
|
*jobIdTarget = jobId;
|
||||||
|
shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_JOB_ID, jobIdTarget);
|
||||||
|
|
||||||
shm_mq_attach(mq, seg, NULL);
|
shm_mq_attach(mq, seg, NULL);
|
||||||
|
|
||||||
return seg;
|
return seg;
|
||||||
|
@ -1343,17 +1535,17 @@ StoreArgumentsInDSM(char *database, char *username, char *command, int64 taskId)
|
||||||
* pointer to the dynamic shared memory.
|
* pointer to the dynamic shared memory.
|
||||||
*/
|
*/
|
||||||
static BackgroundWorkerHandle *
|
static BackgroundWorkerHandle *
|
||||||
StartCitusBackgroundTaskExecutor(char *database, char *user, char *command, int64 taskId,
|
StartCitusBackgroundTaskExecutor(char *database, char *user, char *command,
|
||||||
dsm_segment **pSegment)
|
int64 taskId, int64 jobId, dsm_segment **pSegment)
|
||||||
{
|
{
|
||||||
dsm_segment *seg = StoreArgumentsInDSM(database, user, command, taskId);
|
dsm_segment *seg = StoreArgumentsInDSM(database, user, command, taskId, jobId);
|
||||||
|
|
||||||
/* Configure a worker. */
|
/* Configure a worker. */
|
||||||
BackgroundWorker worker = { 0 };
|
BackgroundWorker worker = { 0 };
|
||||||
memset(&worker, 0, sizeof(worker));
|
memset(&worker, 0, sizeof(worker));
|
||||||
SafeSnprintf(worker.bgw_name, BGW_MAXLEN,
|
SafeSnprintf(worker.bgw_name, BGW_MAXLEN,
|
||||||
"Citus Background Task Queue Executor: %s/%s",
|
"Citus Background Task Queue Executor: %s/%s for (%ld/%ld)",
|
||||||
database, user);
|
database, user, jobId, taskId);
|
||||||
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
|
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
|
||||||
worker.bgw_start_time = BgWorkerStart_ConsistentState;
|
worker.bgw_start_time = BgWorkerStart_ConsistentState;
|
||||||
|
|
||||||
|
@ -1391,6 +1583,8 @@ typedef struct CitusBackgroundJobExecutorErrorCallbackContext
|
||||||
{
|
{
|
||||||
const char *database;
|
const char *database;
|
||||||
const char *username;
|
const char *username;
|
||||||
|
int64 taskId;
|
||||||
|
int64 jobId;
|
||||||
} CitusBackgroundJobExecutorErrorCallbackContext;
|
} CitusBackgroundJobExecutorErrorCallbackContext;
|
||||||
|
|
||||||
|
|
||||||
|
@ -1403,8 +1597,9 @@ CitusBackgroundJobExecutorErrorCallback(void *arg)
|
||||||
{
|
{
|
||||||
CitusBackgroundJobExecutorErrorCallbackContext *context =
|
CitusBackgroundJobExecutorErrorCallbackContext *context =
|
||||||
(CitusBackgroundJobExecutorErrorCallbackContext *) arg;
|
(CitusBackgroundJobExecutorErrorCallbackContext *) arg;
|
||||||
errcontext("Citus Background Task Queue Executor: %s/%s", context->database,
|
errcontext("Citus Background Task Queue Executor: %s/%s for (%ld/%ld)",
|
||||||
context->username);
|
context->database, context->username,
|
||||||
|
context->jobId, context->taskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1418,10 +1613,6 @@ CitusBackgroundJobExecutorErrorCallback(void *arg)
|
||||||
void
|
void
|
||||||
CitusBackgroundTaskExecutor(Datum main_arg)
|
CitusBackgroundTaskExecutor(Datum main_arg)
|
||||||
{
|
{
|
||||||
/*
|
|
||||||
* TODO figure out if we need this signal handler that is in pgcron
|
|
||||||
* pqsignal(SIGTERM, pg_cron_background_worker_sigterm);
|
|
||||||
*/
|
|
||||||
BackgroundWorkerUnblockSignals();
|
BackgroundWorkerUnblockSignals();
|
||||||
|
|
||||||
/* Set up a dynamic shared memory segment. */
|
/* Set up a dynamic shared memory segment. */
|
||||||
|
@ -1445,6 +1636,7 @@ CitusBackgroundTaskExecutor(Datum main_arg)
|
||||||
char *username = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_USERNAME, false);
|
char *username = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_USERNAME, false);
|
||||||
char *command = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_COMMAND, false);
|
char *command = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_COMMAND, false);
|
||||||
int64 *taskId = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_TASK_ID, false);
|
int64 *taskId = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_TASK_ID, false);
|
||||||
|
int64 *jobId = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_JOB_ID, false);
|
||||||
shm_mq *mq = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_QUEUE, false);
|
shm_mq *mq = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_QUEUE, false);
|
||||||
|
|
||||||
shm_mq_set_sender(mq, MyProc);
|
shm_mq_set_sender(mq, MyProc);
|
||||||
|
@ -1456,6 +1648,8 @@ CitusBackgroundTaskExecutor(Datum main_arg)
|
||||||
CitusBackgroundJobExecutorErrorCallbackContext context = {
|
CitusBackgroundJobExecutorErrorCallbackContext context = {
|
||||||
.database = database,
|
.database = database,
|
||||||
.username = username,
|
.username = username,
|
||||||
|
.taskId = *taskId,
|
||||||
|
.jobId = *jobId,
|
||||||
};
|
};
|
||||||
errorCallback.callback = CitusBackgroundJobExecutorErrorCallback;
|
errorCallback.callback = CitusBackgroundJobExecutorErrorCallback;
|
||||||
errorCallback.arg = (void *) &context;
|
errorCallback.arg = (void *) &context;
|
||||||
|
@ -1482,8 +1676,8 @@ CitusBackgroundTaskExecutor(Datum main_arg)
|
||||||
/* Prepare to execute the query. */
|
/* Prepare to execute the query. */
|
||||||
SetCurrentStatementStartTimestamp();
|
SetCurrentStatementStartTimestamp();
|
||||||
debug_query_string = command;
|
debug_query_string = command;
|
||||||
char *appname = psprintf("citus background task queue executor (taskId %ld)",
|
char *appname = psprintf("citus background task queue executor (%ld/%ld)",
|
||||||
*taskId);
|
*jobId, *taskId);
|
||||||
pgstat_report_appname(appname);
|
pgstat_report_appname(appname);
|
||||||
pgstat_report_activity(STATE_RUNNING, command);
|
pgstat_report_activity(STATE_RUNNING, command);
|
||||||
StartTransactionCommand();
|
StartTransactionCommand();
|
||||||
|
|
|
@ -28,6 +28,7 @@ typedef struct BackgroundExecutorHashEntry
|
||||||
|
|
||||||
BackgroundWorkerHandle *handle;
|
BackgroundWorkerHandle *handle;
|
||||||
dsm_segment *seg;
|
dsm_segment *seg;
|
||||||
|
int64 jobid;
|
||||||
StringInfo message;
|
StringInfo message;
|
||||||
} BackgroundExecutorHashEntry;
|
} BackgroundExecutorHashEntry;
|
||||||
|
|
||||||
|
|
|
@ -125,6 +125,12 @@ SELECT citus_job_cancel(:job_id);
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_job_wait(:job_id);
|
||||||
|
citus_job_wait
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id;
|
SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id;
|
||||||
state | did_start
|
state | did_start
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
@ -242,6 +248,12 @@ SELECT citus_job_cancel(:job_id2); -- when a job with 1 task is cancelled, the l
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_job_wait(:job_id2); -- wait for the job to be cancelled
|
||||||
|
citus_job_wait
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
SELECT citus_job_wait(:job_id3, desired_status => 'running');
|
SELECT citus_job_wait(:job_id3, desired_status => 'running');
|
||||||
citus_job_wait
|
citus_job_wait
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
@ -278,12 +290,6 @@ SELECT citus_job_wait(:job_id1);
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT citus_job_wait(:job_id2);
|
|
||||||
citus_job_wait
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
SELECT citus_job_wait(:job_id3);
|
SELECT citus_job_wait(:job_id3);
|
||||||
citus_job_wait
|
citus_job_wait
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
@ -302,11 +308,262 @@ SELECT job_id, task_id, status FROM pg_dist_background_task
|
||||||
9 | 15 | cancelled
|
9 | 15 | cancelled
|
||||||
(5 rows)
|
(5 rows)
|
||||||
|
|
||||||
-- verify that task is not starved by currently long running task
|
-- verify that a task, previously not started due to lack of workers, is executed after we increase max worker count
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset
|
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset
|
||||||
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset
|
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id1 \gset
|
||||||
|
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id2 \gset
|
||||||
|
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id3 \gset
|
||||||
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id2 \gset
|
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id2 \gset
|
||||||
|
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id4 \gset
|
||||||
|
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id3 \gset
|
||||||
|
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id3, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id5 \gset
|
||||||
|
COMMIT;
|
||||||
|
SELECT pg_sleep(2); -- we assume this is enough time for all tasks to be in running status except the last one due to parallel worker limit
|
||||||
|
pg_sleep
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT task_id, status FROM pg_dist_background_task
|
||||||
|
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
|
||||||
|
ORDER BY task_id; -- show that last task is not running but ready to run(runnable)
|
||||||
|
task_id | status
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
16 | running
|
||||||
|
17 | running
|
||||||
|
18 | running
|
||||||
|
19 | running
|
||||||
|
20 | runnable
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
ALTER SYSTEM SET citus.max_background_task_executors TO 5;
|
||||||
|
SELECT pg_reload_conf(); -- the last runnable task will be running after change
|
||||||
|
pg_reload_conf
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_job_wait(:job_id3, desired_status => 'running');
|
||||||
|
citus_job_wait
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT task_id, status FROM pg_dist_background_task
|
||||||
|
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
|
||||||
|
ORDER BY task_id; -- show that last task is running
|
||||||
|
task_id | status
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
16 | running
|
||||||
|
17 | running
|
||||||
|
18 | running
|
||||||
|
19 | running
|
||||||
|
20 | running
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
SELECT citus_job_cancel(:job_id1);
|
||||||
|
citus_job_cancel
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_job_cancel(:job_id2);
|
||||||
|
citus_job_cancel
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_job_cancel(:job_id3);
|
||||||
|
citus_job_cancel
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_job_wait(:job_id1);
|
||||||
|
citus_job_wait
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_job_wait(:job_id2);
|
||||||
|
citus_job_wait
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_job_wait(:job_id3);
|
||||||
|
citus_job_wait
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT task_id, status FROM pg_dist_background_task
|
||||||
|
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
|
||||||
|
ORDER BY task_id; -- show that all tasks are cancelled
|
||||||
|
task_id | status
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
16 | cancelled
|
||||||
|
17 | cancelled
|
||||||
|
18 | cancelled
|
||||||
|
19 | cancelled
|
||||||
|
20 | cancelled
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
-- verify that upon termination signal, all tasks fail and retry policy sets their status back to runnable
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id1 \gset
|
||||||
|
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset
|
||||||
|
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id2 \gset
|
||||||
|
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset
|
||||||
|
COMMIT;
|
||||||
|
SELECT citus_job_wait(:job_id1, desired_status => 'running');
|
||||||
|
citus_job_wait
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_job_wait(:job_id2, desired_status => 'running');
|
||||||
|
citus_job_wait
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT task_id, status FROM pg_dist_background_task
|
||||||
|
WHERE task_id IN (:task_id1, :task_id2)
|
||||||
|
ORDER BY task_id;
|
||||||
|
task_id | status
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
21 | running
|
||||||
|
22 | running
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset
|
||||||
|
SELECT pg_terminate_backend(:monitor_pid); -- terminate monitor process
|
||||||
|
pg_terminate_backend
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT pg_sleep(2); -- wait enough to show that tasks are terminated
|
||||||
|
pg_sleep
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT task_id, status, retry_count, message FROM pg_dist_background_task
|
||||||
|
WHERE task_id IN (:task_id1, :task_id2)
|
||||||
|
ORDER BY task_id; -- show that all tasks are runnable by retry policy after termination signal
|
||||||
|
task_id | status | retry_count | message
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
21 | runnable | 1 | FATAL: terminating background worker "Citus Background Task Queue Executor: regression/postgres for (13/21)" due to administrator command+
|
||||||
|
| | | CONTEXT: Citus Background Task Queue Executor: regression/postgres for (13/21) +
|
||||||
|
| | |
|
||||||
|
22 | runnable | 1 | FATAL: terminating background worker "Citus Background Task Queue Executor: regression/postgres for (14/22)" due to administrator command+
|
||||||
|
| | | CONTEXT: Citus Background Task Queue Executor: regression/postgres for (14/22) +
|
||||||
|
| | |
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT citus_job_cancel(:job_id1);
|
||||||
|
citus_job_cancel
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_job_cancel(:job_id2);
|
||||||
|
citus_job_cancel
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_job_wait(:job_id1);
|
||||||
|
citus_job_wait
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_job_wait(:job_id2);
|
||||||
|
citus_job_wait
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT task_id, status FROM pg_dist_background_task
|
||||||
|
WHERE task_id IN (:task_id1, :task_id2)
|
||||||
|
ORDER BY task_id; -- show that all tasks are cancelled
|
||||||
|
task_id | status
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
21 | cancelled
|
||||||
|
22 | cancelled
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- verify that upon cancellation signal, all tasks are cancelled
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id1 \gset
|
||||||
|
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset
|
||||||
|
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id2 \gset
|
||||||
|
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset
|
||||||
|
COMMIT;
|
||||||
|
SELECT citus_job_wait(:job_id1, desired_status => 'running');
|
||||||
|
citus_job_wait
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_job_wait(:job_id2, desired_status => 'running');
|
||||||
|
citus_job_wait
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT task_id, status FROM pg_dist_background_task
|
||||||
|
WHERE task_id IN (:task_id1, :task_id2)
|
||||||
|
ORDER BY task_id;
|
||||||
|
task_id | status
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
23 | running
|
||||||
|
24 | running
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset
|
||||||
|
SELECT pg_cancel_backend(:monitor_pid); -- cancel monitor process
|
||||||
|
pg_cancel_backend
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT pg_sleep(2); -- wait enough to show that tasks are cancelled
|
||||||
|
pg_sleep
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_job_wait(:job_id1);
|
||||||
|
citus_job_wait
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_job_wait(:job_id2);
|
||||||
|
citus_job_wait
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT task_id, status FROM pg_dist_background_task
|
||||||
|
WHERE task_id IN (:task_id1, :task_id2)
|
||||||
|
ORDER BY task_id; -- show that all tasks are cancelled
|
||||||
|
task_id | status
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
23 | cancelled
|
||||||
|
24 | cancelled
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- verify that task is not starved by currently long running task
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id1 \gset
|
||||||
|
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset
|
||||||
|
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id2 \gset
|
||||||
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT 1; $job$) RETURNING task_id AS task_id2 \gset
|
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT 1; $job$) RETURNING task_id AS task_id2 \gset
|
||||||
COMMIT;
|
COMMIT;
|
||||||
SELECT citus_job_wait(:job_id1, desired_status => 'running');
|
SELECT citus_job_wait(:job_id1, desired_status => 'running');
|
||||||
|
@ -326,8 +583,8 @@ SELECT job_id, task_id, status FROM pg_dist_background_task
|
||||||
ORDER BY job_id, task_id; -- show that last task is finished without starvation
|
ORDER BY job_id, task_id; -- show that last task is finished without starvation
|
||||||
job_id | task_id | status
|
job_id | task_id | status
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
10 | 16 | running
|
17 | 25 | running
|
||||||
11 | 17 | done
|
18 | 26 | done
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
SELECT citus_job_cancel(:job_id1);
|
SELECT citus_job_cancel(:job_id1);
|
||||||
|
@ -336,6 +593,21 @@ SELECT citus_job_cancel(:job_id1);
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_job_wait(:job_id1);
|
||||||
|
citus_job_wait
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT job_id, task_id, status FROM pg_dist_background_task
|
||||||
|
WHERE task_id IN (:task_id1, :task_id2)
|
||||||
|
ORDER BY job_id, task_id; -- show that task is cancelled
|
||||||
|
job_id | task_id | status
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
17 | 25 | cancelled
|
||||||
|
18 | 26 | done
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA background_task_queue_monitor CASCADE;
|
DROP SCHEMA background_task_queue_monitor CASCADE;
|
||||||
ALTER SYSTEM RESET citus.background_task_queue_interval;
|
ALTER SYSTEM RESET citus.background_task_queue_interval;
|
||||||
|
|
|
@ -54,6 +54,7 @@ SELECT status, pid, retry_count, NOT(message = '') AS has_message, (not_before >
|
||||||
|
|
||||||
-- test cancelling a failed/retrying job
|
-- test cancelling a failed/retrying job
|
||||||
SELECT citus_job_cancel(:job_id);
|
SELECT citus_job_cancel(:job_id);
|
||||||
|
SELECT citus_job_wait(:job_id);
|
||||||
|
|
||||||
SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id;
|
SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id;
|
||||||
SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC;
|
SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC;
|
||||||
|
@ -110,6 +111,7 @@ SELECT job_id, task_id, status FROM pg_dist_background_task
|
||||||
ORDER BY job_id, task_id; -- show that last task is not running but ready to run(runnable)
|
ORDER BY job_id, task_id; -- show that last task is not running but ready to run(runnable)
|
||||||
|
|
||||||
SELECT citus_job_cancel(:job_id2); -- when a job with 1 task is cancelled, the last runnable task will be running
|
SELECT citus_job_cancel(:job_id2); -- when a job with 1 task is cancelled, the last runnable task will be running
|
||||||
|
SELECT citus_job_wait(:job_id2); -- wait for the job to be cancelled
|
||||||
SELECT citus_job_wait(:job_id3, desired_status => 'running');
|
SELECT citus_job_wait(:job_id3, desired_status => 'running');
|
||||||
SELECT job_id, task_id, status FROM pg_dist_background_task
|
SELECT job_id, task_id, status FROM pg_dist_background_task
|
||||||
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
|
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
|
||||||
|
@ -118,18 +120,115 @@ SELECT job_id, task_id, status FROM pg_dist_background_task
|
||||||
SELECT citus_job_cancel(:job_id1);
|
SELECT citus_job_cancel(:job_id1);
|
||||||
SELECT citus_job_cancel(:job_id3);
|
SELECT citus_job_cancel(:job_id3);
|
||||||
SELECT citus_job_wait(:job_id1);
|
SELECT citus_job_wait(:job_id1);
|
||||||
SELECT citus_job_wait(:job_id2);
|
|
||||||
SELECT citus_job_wait(:job_id3);
|
SELECT citus_job_wait(:job_id3);
|
||||||
SELECT job_id, task_id, status FROM pg_dist_background_task
|
SELECT job_id, task_id, status FROM pg_dist_background_task
|
||||||
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
|
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
|
||||||
ORDER BY job_id, task_id; -- show that multiple cancels worked
|
ORDER BY job_id, task_id; -- show that multiple cancels worked
|
||||||
|
|
||||||
|
|
||||||
-- verify that task is not starved by currently long running task
|
-- verify that a task, previously not started due to lack of workers, is executed after we increase max worker count
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset
|
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset
|
||||||
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset
|
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id1 \gset
|
||||||
|
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id2 \gset
|
||||||
|
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id3 \gset
|
||||||
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id2 \gset
|
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id2 \gset
|
||||||
|
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id4 \gset
|
||||||
|
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id3 \gset
|
||||||
|
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id3, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id5 \gset
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
SELECT pg_sleep(2); -- we assume this is enough time for all tasks to be in running status except the last one due to parallel worker limit
|
||||||
|
|
||||||
|
SELECT task_id, status FROM pg_dist_background_task
|
||||||
|
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
|
||||||
|
ORDER BY task_id; -- show that last task is not running but ready to run(runnable)
|
||||||
|
|
||||||
|
ALTER SYSTEM SET citus.max_background_task_executors TO 5;
|
||||||
|
SELECT pg_reload_conf(); -- the last runnable task will be running after change
|
||||||
|
SELECT citus_job_wait(:job_id3, desired_status => 'running');
|
||||||
|
SELECT task_id, status FROM pg_dist_background_task
|
||||||
|
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
|
||||||
|
ORDER BY task_id; -- show that last task is running
|
||||||
|
|
||||||
|
SELECT citus_job_cancel(:job_id1);
|
||||||
|
SELECT citus_job_cancel(:job_id2);
|
||||||
|
SELECT citus_job_cancel(:job_id3);
|
||||||
|
SELECT citus_job_wait(:job_id1);
|
||||||
|
SELECT citus_job_wait(:job_id2);
|
||||||
|
SELECT citus_job_wait(:job_id3);
|
||||||
|
|
||||||
|
SELECT task_id, status FROM pg_dist_background_task
|
||||||
|
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
|
||||||
|
ORDER BY task_id; -- show that all tasks are cancelled
|
||||||
|
|
||||||
|
-- verify that upon termination signal, all tasks fail and retry policy sets their status back to runnable
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id1 \gset
|
||||||
|
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset
|
||||||
|
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id2 \gset
|
||||||
|
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
SELECT citus_job_wait(:job_id1, desired_status => 'running');
|
||||||
|
SELECT citus_job_wait(:job_id2, desired_status => 'running');
|
||||||
|
|
||||||
|
SELECT task_id, status FROM pg_dist_background_task
|
||||||
|
WHERE task_id IN (:task_id1, :task_id2)
|
||||||
|
ORDER BY task_id;
|
||||||
|
|
||||||
|
|
||||||
|
SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset
|
||||||
|
SELECT pg_terminate_backend(:monitor_pid); -- terminate monitor process
|
||||||
|
|
||||||
|
SELECT pg_sleep(2); -- wait enough to show that tasks are terminated
|
||||||
|
|
||||||
|
SELECT task_id, status, retry_count, message FROM pg_dist_background_task
|
||||||
|
WHERE task_id IN (:task_id1, :task_id2)
|
||||||
|
ORDER BY task_id; -- show that all tasks are runnable by retry policy after termination signal
|
||||||
|
|
||||||
|
SELECT citus_job_cancel(:job_id1);
|
||||||
|
SELECT citus_job_cancel(:job_id2);
|
||||||
|
SELECT citus_job_wait(:job_id1);
|
||||||
|
SELECT citus_job_wait(:job_id2);
|
||||||
|
|
||||||
|
SELECT task_id, status FROM pg_dist_background_task
|
||||||
|
WHERE task_id IN (:task_id1, :task_id2)
|
||||||
|
ORDER BY task_id; -- show that all tasks are cancelled
|
||||||
|
|
||||||
|
-- verify that upon cancellation signal, all tasks are cancelled
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id1 \gset
|
||||||
|
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset
|
||||||
|
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id2 \gset
|
||||||
|
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
SELECT citus_job_wait(:job_id1, desired_status => 'running');
|
||||||
|
SELECT citus_job_wait(:job_id2, desired_status => 'running');
|
||||||
|
|
||||||
|
SELECT task_id, status FROM pg_dist_background_task
|
||||||
|
WHERE task_id IN (:task_id1, :task_id2)
|
||||||
|
ORDER BY task_id;
|
||||||
|
|
||||||
|
|
||||||
|
SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset
|
||||||
|
SELECT pg_cancel_backend(:monitor_pid); -- cancel monitor process
|
||||||
|
|
||||||
|
SELECT pg_sleep(2); -- wait enough to show that tasks are cancelled
|
||||||
|
|
||||||
|
SELECT citus_job_wait(:job_id1);
|
||||||
|
SELECT citus_job_wait(:job_id2);
|
||||||
|
|
||||||
|
SELECT task_id, status FROM pg_dist_background_task
|
||||||
|
WHERE task_id IN (:task_id1, :task_id2)
|
||||||
|
ORDER BY task_id; -- show that all tasks are cancelled
|
||||||
|
|
||||||
|
-- verify that task is not starved by currently long running task
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id1 \gset
|
||||||
|
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset
|
||||||
|
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id2 \gset
|
||||||
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT 1; $job$) RETURNING task_id AS task_id2 \gset
|
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT 1; $job$) RETURNING task_id AS task_id2 \gset
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
|
@ -139,6 +238,11 @@ SELECT job_id, task_id, status FROM pg_dist_background_task
|
||||||
WHERE task_id IN (:task_id1, :task_id2)
|
WHERE task_id IN (:task_id1, :task_id2)
|
||||||
ORDER BY job_id, task_id; -- show that last task is finished without starvation
|
ORDER BY job_id, task_id; -- show that last task is finished without starvation
|
||||||
SELECT citus_job_cancel(:job_id1);
|
SELECT citus_job_cancel(:job_id1);
|
||||||
|
SELECT citus_job_wait(:job_id1);
|
||||||
|
|
||||||
|
SELECT job_id, task_id, status FROM pg_dist_background_task
|
||||||
|
WHERE task_id IN (:task_id1, :task_id2)
|
||||||
|
ORDER BY job_id, task_id; -- show that task is cancelled
|
||||||
|
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA background_task_queue_monitor CASCADE;
|
DROP SCHEMA background_task_queue_monitor CASCADE;
|
||||||
|
|
Loading…
Reference in New Issue