Support pausing jobs #104313
@ -167,7 +167,7 @@ func (sm *StateMachine) jobStatusIfAThenB(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// isJobPausingComplete returns true when the job status is pause-requested and there are no more active tasks.
|
// isJobPausingComplete returns true when the job status is pause-requested and there are no more active tasks.
|
||||||
func (sm *StateMachine) isJobPausingComplete(ctx context.Context, logger zerolog.Logger, job *persistence.Job) (bool, error) {
|
func (sm *StateMachine) isJobPausingComplete(ctx context.Context, job *persistence.Job) (bool, error) {
|
||||||
if job.Status != api.JobStatusPauseRequested {
|
if job.Status != api.JobStatusPauseRequested {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
@ -175,12 +175,7 @@ func (sm *StateMachine) isJobPausingComplete(ctx context.Context, logger zerolog
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
if numActive == 0 {
|
return numActive == 0, nil
|
||||||
// There is no active task, and the job is in pause-requested status, so we can pause the job.
|
|
||||||
logger.Info().Msg("No more active tasks, job is paused")
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
return false, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateJobOnTaskStatusCanceled conditionally escalates the cancellation of a task to cancel the job.
|
// updateJobOnTaskStatusCanceled conditionally escalates the cancellation of a task to cancel the job.
|
||||||
@ -198,7 +193,7 @@ func (sm *StateMachine) updateJobOnTaskStatusCanceled(ctx context.Context, logge
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Deal with the special case when the job is in pause-requested status.
|
// Deal with the special case when the job is in pause-requested status.
|
||||||
toBePaused, err := sm.isJobPausingComplete(ctx, logger, job)
|
toBePaused, err := sm.isJobPausingComplete(ctx, job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -232,7 +227,7 @@ func (sm *StateMachine) updateJobOnTaskStatusFailed(ctx context.Context, logger
|
|||||||
failLogger.Info().Msg("task failed, but not enough to fail the job")
|
failLogger.Info().Msg("task failed, but not enough to fail the job")
|
||||||
|
|
||||||
// Deal with the special case when the job is in pause-requested status.
|
// Deal with the special case when the job is in pause-requested status.
|
||||||
toBePaused, err := sm.isJobPausingComplete(ctx, logger, job)
|
toBePaused, err := sm.isJobPausingComplete(ctx, job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -256,7 +251,7 @@ func (sm *StateMachine) updateJobOnTaskStatusCompleted(ctx context.Context, logg
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Deal with the special case when the job is in pause-requested status.
|
// Deal with the special case when the job is in pause-requested status.
|
||||||
toBePaused, err := sm.isJobPausingComplete(ctx, logger, job)
|
toBePaused, err := sm.isJobPausingComplete(ctx, job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -511,9 +506,9 @@ func (sm *StateMachine) pauseTasks(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// If pausing was requested, it has now happened, so the job can transition.
|
// If pausing was requested, it has now happened, so the job can transition.
|
||||||
toBePaused, err := sm.isJobPausingComplete(ctx, logger, job)
|
toBePaused, err := sm.isJobPausingComplete(ctx, job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("error when accessing number of active tasks")
|
return "", err
|
||||||
}
|
}
|
||||||
if toBePaused {
|
if toBePaused {
|
||||||
logger.Info().Msg("all tasks of job paused, job can go to 'paused' status")
|
logger.Info().Msg("all tasks of job paused, job can go to 'paused' status")
|
||||||
|
Loading…
Reference in New Issue
Block a user