WIP: Allow jobs to be submitted in paused status #104318

Closed
David Zhang wants to merge 32 commits from David-Zhang-10/flamenco:submit-as-paused into main

When changing the target branch, be careful to rebase the branch in your fork to match. See documentation.
Showing only changes of commit 71c7715eab - Show all commits

View File

@ -169,7 +169,7 @@ func (sm *StateMachine) jobStatusIfAThenB(
func (sm *StateMachine) updateJobOnTaskStatusCanceled(ctx context.Context, logger zerolog.Logger, job *persistence.Job) error { func (sm *StateMachine) updateJobOnTaskStatusCanceled(ctx context.Context, logger zerolog.Logger, job *persistence.Job) error {
// If no more tasks can run, cancel the job. // If no more tasks can run, cancel the job.
numRunnable, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, numRunnable, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job,
api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed) api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed, api.TaskStatusPaused)
if err != nil { if err != nil {
return err return err
} }
@ -179,6 +179,16 @@ func (sm *StateMachine) updateJobOnTaskStatusCanceled(ctx context.Context, logge
return sm.JobStatusChange(ctx, job, api.JobStatusCanceled, "canceled task was last runnable task of job, canceling job") return sm.JobStatusChange(ctx, job, api.JobStatusCanceled, "canceled task was last runnable task of job, canceling job")
} }
numActive, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive)
if err != nil {
return err
}
if numActive == 0 && job.Status == api.JobStatusPauseRequested {
// there is no active task, and the job is in pause-requested status, so we can pause the job
logger.Info().Msg("all tasks of job are completed, job is paused")
return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "all tasks completed")
}
return nil return nil
} }
@ -203,6 +213,16 @@ func (sm *StateMachine) updateJobOnTaskStatusFailed(ctx context.Context, logger
} }
// If the job didn't fail, this failure indicates that at least the job is active. // If the job didn't fail, this failure indicates that at least the job is active.
failLogger.Info().Msg("task failed, but not enough to fail the job") failLogger.Info().Msg("task failed, but not enough to fail the job")
numActive, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive)
if err != nil {
return err
}
if numActive == 0 && job.Status == api.JobStatusPauseRequested {
// there is no active task, and the job is in pause-requested status, so we can pause the job
logger.Info().Msg("all tasks of job are completed, job is paused")
return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "all tasks completed")
}
return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive, return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive,
"task failed, but not enough to fail the job") "task failed, but not enough to fail the job")
} }
@ -473,6 +493,16 @@ func (sm *StateMachine) pauseTasks(
return "", fmt.Errorf("pausing tasks of job %s: %w", job.UUID, err) return "", fmt.Errorf("pausing tasks of job %s: %w", job.UUID, err)
} }
// If pausing was requested, it has now happened, so the job can transition.
numActive, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive)
if err != nil {
return "", fmt.Errorf("error when accessing number of active tasks")
}
if job.Status == api.JobStatusPauseRequested && numActive == 0 {
logger.Info().Msg("all tasks of job paused, job can go to 'paused' status")
return api.JobStatusPaused, nil
}
return api.JobStatusPauseRequested, nil return api.JobStatusPauseRequested, nil
} }