Support pausing jobs #104313
@ -166,11 +166,26 @@ func (sm *StateMachine) jobStatusIfAThenB(
|
||||
return sm.JobStatusChange(ctx, job, thenStatus, reason)
|
||||
}
|
||||
|
||||
func (sm *StateMachine) shouldJobBePaused(ctx context.Context, logger zerolog.Logger, job *persistence.Job) (bool, error) {
|
||||
if job.Status == api.JobStatusPauseRequested {
|
||||
numActive, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if numActive == 0 {
|
||||
// There is no active task, and the job is in pause-requested status, so we can pause the job.
|
||||
logger.Info().Msg("No more active tasks, job is paused")
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// updateJobOnTaskStatusCanceled conditionally escalates the cancellation of a task to cancel the job.
|
||||
func (sm *StateMachine) updateJobOnTaskStatusCanceled(ctx context.Context, logger zerolog.Logger, job *persistence.Job) error {
|
||||
// If no more tasks can run, cancel the job.
|
||||
numRunnable, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job,
|
||||
api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed, api.TaskStatusPaused)
|
||||
api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -181,16 +196,12 @@ func (sm *StateMachine) updateJobOnTaskStatusCanceled(ctx context.Context, logge
|
||||
}
|
||||
|
||||
// Deal with the special case when the job is in pause-requested status.
|
||||
if job.Status == api.JobStatusPauseRequested {
|
||||
numActive, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if numActive == 0 {
|
||||
// There is no active task, and the job is in pause-requested status, so we can pause the job.
|
||||
logger.Info().Msg("No more active tasks, job is paused")
|
||||
return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "all tasks completed")
|
||||
}
|
||||
toBePaused, err := sm.shouldJobBePaused(ctx, logger, job)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if toBePaused {
|
||||
return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "no more active tasks after task cancellation")
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -218,16 +229,13 @@ func (sm *StateMachine) updateJobOnTaskStatusFailed(ctx context.Context, logger
|
||||
// If the job didn't fail, this failure indicates that at least the job is active.
|
||||
failLogger.Info().Msg("task failed, but not enough to fail the job")
|
||||
|
||||
if job.Status == api.JobStatusPauseRequested {
|
||||
numActive, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if numActive == 0 {
|
||||
// There is no active task, and the job is in pause-requested status, so we can pause the job.
|
||||
failLogger.Info().Msg("No more active tasks, job is paused")
|
||||
return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "all tasks completed")
|
||||
}
|
||||
// Deal with the special case when the job is in pause-requested status.
|
||||
toBePaused, err := sm.shouldJobBePaused(ctx, logger, job)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if toBePaused {
|
||||
return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "no more active tasks after task failure")
|
||||
}
|
||||
|
||||
return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive,
|
||||
@ -245,16 +253,13 @@ func (sm *StateMachine) updateJobOnTaskStatusCompleted(ctx context.Context, logg
|
||||
return sm.JobStatusChange(ctx, job, api.JobStatusCompleted, "all tasks completed")
|
||||
}
|
||||
|
||||
if job.Status == api.JobStatusPauseRequested {
|
||||
numActive, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if numActive == 0 {
|
||||
// There is no active task, and the job is in pause-requested status, so we can pause the job.
|
||||
logger.Info().Msg("No more active tasks, job is paused")
|
||||
return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "all tasks completed")
|
||||
}
|
||||
// Deal with the special case when the job is in pause-requested status.
|
||||
toBePaused, err := sm.shouldJobBePaused(ctx, logger, job)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if toBePaused {
|
||||
return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "no more active tasks after task completion")
|
||||
}
|
||||
|
||||
logger.Info().
|
||||
|
Loading…
Reference in New Issue
Block a user