Support pausing jobs #104313

Manually merged
Sybren A. Stüvel merged 28 commits from David-Zhang-10/flamenco:paused-job-status into main 2024-07-01 17:53:44 +02:00
2 changed files with 73 additions and 8 deletions
Showing only changes of commit 484f746f92 - Show all commits

View File

@ -509,11 +509,11 @@ func (sm *StateMachine) pauseTasks(
} }
// If pausing was requested, it has now happened, so the job can transition. // If pausing was requested, it has now happened, so the job can transition.
numActive, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive) toBePaused, err := sm.shouldJobBePaused(ctx, logger, job)
if err != nil { if err != nil {
return "", fmt.Errorf("error when accessing number of active tasks") return "", fmt.Errorf("error when accessing number of active tasks")
} }
if job.Status == api.JobStatusPauseRequested && numActive == 0 { if toBePaused {
logger.Info().Msg("all tasks of job paused, job can go to 'paused' status") logger.Info().Msg("all tasks of job paused, job can go to 'paused' status")
return api.JobStatusPaused, nil return api.JobStatusPaused, nil
} }

View File

@ -340,7 +340,7 @@ func TestJobPauseWithAllQueuedTasks(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t) mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish() defer mockCtrl.Finish()
task1 := taskWithStatus(api.JobStatusActive, api.TaskStatusQueued) task1 := taskWithStatus(api.JobStatusQueued, api.TaskStatusQueued)
task2 := taskOfSameJob(task1, api.TaskStatusQueued) task2 := taskOfSameJob(task1, api.TaskStatusQueued)
task3 := taskOfSameJob(task2, api.TaskStatusQueued) task3 := taskOfSameJob(task2, api.TaskStatusQueued)
job := task3.Job job := task3.Job
@ -348,17 +348,82 @@ func TestJobPauseWithAllQueuedTasks(t *testing.T) {
mocks.expectSaveJobWithStatus(t, job, api.JobStatusPauseRequested) mocks.expectSaveJobWithStatus(t, job, api.JobStatusPauseRequested)
// Expect pausing of the job to trigger pausing of all its queued tasks. // Expect pausing of the job to trigger pausing of all its queued tasks.
mocks.persist.EXPECT().UpdateJobsTaskStatuses(ctx, job, api.TaskStatusPaused, mocks.persist.EXPECT().UpdateJobsTaskStatusesConditional(ctx, job,
"Paused because job transitioned status from \"active\" to \"paused\"") []api.TaskStatus{
api.TaskStatusQueued,
api.TaskStatusSoftFailed,
},
api.TaskStatusPaused,
"Manager paused this task because the job got status \"pause-requested\".",
)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job,
api.TaskStatusActive).
Return(0, 3, nil)
mocks.expectSaveJobWithStatus(t, job, api.JobStatusPaused) mocks.expectSaveJobWithStatus(t, job, api.JobStatusPaused)
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusQueued, api.JobStatusPauseRequested)
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusActive, api.JobStatusPauseRequested)
mocks.expectBroadcastJobChange(job, api.JobStatusPauseRequested, api.JobStatusPaused) mocks.expectBroadcastJobChange(job, api.JobStatusPauseRequested, api.JobStatusPaused)
require.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusPauseRequested, "someone wrote a unittest")) require.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusPauseRequested, "someone wrote a unittest"))
} }
func TestJobPauseWithSomeCompletedTasks(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
task1 := taskWithStatus(api.JobStatusQueued, api.TaskStatusCompleted)
task2 := taskOfSameJob(task1, api.TaskStatusQueued)
task3 := taskOfSameJob(task2, api.TaskStatusQueued)
job := task3.Job
mocks.expectSaveJobWithStatus(t, job, api.JobStatusPauseRequested)
// Expect pausing of the job to trigger pausing of all its queued tasks.
mocks.persist.EXPECT().UpdateJobsTaskStatusesConditional(ctx, job,
[]api.TaskStatus{
api.TaskStatusQueued,
api.TaskStatusSoftFailed,
},
api.TaskStatusPaused,
"Manager paused this task because the job got status \"pause-requested\".",
)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job,
api.TaskStatusActive).
Return(0, 3, nil)
mocks.expectSaveJobWithStatus(t, job, api.JobStatusPaused)
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusQueued, api.JobStatusPauseRequested)
mocks.expectBroadcastJobChange(job, api.JobStatusPauseRequested, api.JobStatusPaused)
require.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusPauseRequested, "someone wrote a unittest"))
}
func TestJobPauseWithSomeActiveTasks(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
task1 := taskWithStatus(api.JobStatusActive, api.TaskStatusActive)
task2 := taskOfSameJob(task1, api.TaskStatusCompleted)
task3 := taskOfSameJob(task2, api.TaskStatusQueued)
job := task3.Job
mocks.expectSaveJobWithStatus(t, job, api.JobStatusPauseRequested)
// Expect pausing of the job to trigger pausing of all its queued tasks.
mocks.persist.EXPECT().UpdateJobsTaskStatusesConditional(ctx, job,
[]api.TaskStatus{
api.TaskStatusQueued,
api.TaskStatusSoftFailed,
},
api.TaskStatusPaused,
"Manager paused this task because the job got status \"pause-requested\".",
)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job,
api.TaskStatusActive).
Return(1, 3, nil)
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusActive, api.JobStatusPauseRequested)
require.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusPauseRequested, "someone wrote a unittest"))
}
func TestCheckStuck(t *testing.T) { func TestCheckStuck(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t) mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish() defer mockCtrl.Finish()