Support pausing jobs #104313

Manually merged
Sybren A. Stüvel merged 28 commits from David-Zhang-10/flamenco:paused-job-status into main 2024-07-01 17:53:44 +02:00
Showing only changes of commit adac7bbb37 - Show all commits

View File

@ -166,7 +166,8 @@ func (sm *StateMachine) jobStatusIfAThenB(
return sm.JobStatusChange(ctx, job, thenStatus, reason) return sm.JobStatusChange(ctx, job, thenStatus, reason)
} }
func (sm *StateMachine) shouldJobBePaused(ctx context.Context, logger zerolog.Logger, job *persistence.Job) (bool, error) { // isJobPausingComplete returns true when the job status is pause-requested and there are no more active tasks.
func (sm *StateMachine) isJobPausingComplete(ctx context.Context, logger zerolog.Logger, job *persistence.Job) (bool, error) {
if job.Status == api.JobStatusPauseRequested { if job.Status == api.JobStatusPauseRequested {
numActive, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive) numActive, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive)
if err != nil { if err != nil {
@ -196,7 +197,7 @@ func (sm *StateMachine) updateJobOnTaskStatusCanceled(ctx context.Context, logge
} }
// Deal with the special case when the job is in pause-requested status. // Deal with the special case when the job is in pause-requested status.
toBePaused, err := sm.shouldJobBePaused(ctx, logger, job) toBePaused, err := sm.isJobPausingComplete(ctx, logger, job)
if err != nil { if err != nil {
return err return err
} }
@ -230,7 +231,7 @@ func (sm *StateMachine) updateJobOnTaskStatusFailed(ctx context.Context, logger
failLogger.Info().Msg("task failed, but not enough to fail the job") failLogger.Info().Msg("task failed, but not enough to fail the job")
// Deal with the special case when the job is in pause-requested status. // Deal with the special case when the job is in pause-requested status.
toBePaused, err := sm.shouldJobBePaused(ctx, logger, job) toBePaused, err := sm.isJobPausingComplete(ctx, logger, job)
if err != nil { if err != nil {
return err return err
} }
@ -254,7 +255,7 @@ func (sm *StateMachine) updateJobOnTaskStatusCompleted(ctx context.Context, logg
} }
// Deal with the special case when the job is in pause-requested status. // Deal with the special case when the job is in pause-requested status.
toBePaused, err := sm.shouldJobBePaused(ctx, logger, job) toBePaused, err := sm.isJobPausingComplete(ctx, logger, job)
if err != nil { if err != nil {
return err return err
} }
@ -509,7 +510,7 @@ func (sm *StateMachine) pauseTasks(
} }
// If pausing was requested, it has now happened, so the job can transition. // If pausing was requested, it has now happened, so the job can transition.
toBePaused, err := sm.shouldJobBePaused(ctx, logger, job) toBePaused, err := sm.isJobPausingComplete(ctx, logger, job)
if err != nil { if err != nil {
return "", fmt.Errorf("error when accessing number of active tasks") return "", fmt.Errorf("error when accessing number of active tasks")
} }