Support pausing jobs #104313
@ -119,7 +119,7 @@ func (sm *StateMachine) updateJobAfterTaskStatusChange(
|
|||||||
return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusCompleted, api.JobStatusRequeueing, "task was queued")
|
return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusCompleted, api.JobStatusRequeueing, "task was queued")
|
||||||
|
|
||||||
case api.TaskStatusPaused:
|
case api.TaskStatusPaused:
|
||||||
return sm.updateJobOnTaskStatusPaused(ctx, logger, job)
|
return nil
|
||||||
|
|
||||||
case api.TaskStatusCanceled:
|
case api.TaskStatusCanceled:
|
||||||
return sm.updateJobOnTaskStatusCanceled(ctx, logger, job)
|
return sm.updateJobOnTaskStatusCanceled(ctx, logger, job)
|
||||||
@ -182,38 +182,6 @@ func (sm *StateMachine) updateJobOnTaskStatusCanceled(ctx context.Context, logge
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateJobOnTaskStatusPaused conditionally escalates the pausing of a task to pause the job.
|
|
||||||
func (sm *StateMachine) updateJobOnTaskStatusPaused(ctx context.Context, logger zerolog.Logger, job *persistence.Job) error {
|
|
||||||
// If no more tasks can run, pause the job.
|
|
||||||
numRunnable, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job,
|
|
||||||
api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if numRunnable == 0 {
|
|
||||||
logger.Info().Msg("paused task was last runnable task of job, pausing job")
|
|
||||||
return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "paused task was last runnable task of job, pausing job")
|
|
||||||
}
|
|
||||||
|
|
||||||
if job.Status == api.JobStatusPauseRequested {
|
|
||||||
// if the job is in pause-requested state, and all other tasks are paused,
|
|
||||||
// then the job can be paused.
|
|
||||||
numPaused, numTotal, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusPaused)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if numPaused == numTotal {
|
|
||||||
logger.Info().Msg("all tasks of job are paused, job is paused")
|
|
||||||
return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "all tasks paused")
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// if the job is not in pause-requested state, then some error occurred and the job should be failed.
|
|
||||||
logger.Info().Msg("task cannot be changed to paused when job is not in pause-requested state")
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// updateJobOnTaskStatusFailed conditionally escalates the failure of a task to fail the entire job.
|
// updateJobOnTaskStatusFailed conditionally escalates the failure of a task to fail the entire job.
|
||||||
func (sm *StateMachine) updateJobOnTaskStatusFailed(ctx context.Context, logger zerolog.Logger, job *persistence.Job) error {
|
func (sm *StateMachine) updateJobOnTaskStatusFailed(ctx context.Context, logger zerolog.Logger, job *persistence.Job) error {
|
||||||
// Count the number of failed tasks. If it is over the threshold, fail the job.
|
// Count the number of failed tasks. If it is over the threshold, fail the job.
|
||||||
|
Loading…
Reference in New Issue
Block a user