WIP: Allow jobs to be submitted in paused status #104318

Closed
David Zhang wants to merge 32 commits from David-Zhang-10/flamenco:submit-as-paused into main

When changing the target branch, be careful to rebase the branch in your fork to match. See documentation.
2 changed files with 111 additions and 41 deletions
Showing only changes of commit 3f53a9ad2a - Show all commits

View File

@ -166,11 +166,26 @@ func (sm *StateMachine) jobStatusIfAThenB(
return sm.JobStatusChange(ctx, job, thenStatus, reason) return sm.JobStatusChange(ctx, job, thenStatus, reason)
} }
func (sm *StateMachine) shouldJobBePaused(ctx context.Context, logger zerolog.Logger, job *persistence.Job) (bool, error) {
if job.Status == api.JobStatusPauseRequested {
numActive, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive)
if err != nil {
return false, err
}
if numActive == 0 {
// There is no active task, and the job is in pause-requested status, so we can pause the job.
logger.Info().Msg("No more active tasks, job is paused")
return true, nil
}
}
return false, nil
}
// updateJobOnTaskStatusCanceled conditionally escalates the cancellation of a task to cancel the job. // updateJobOnTaskStatusCanceled conditionally escalates the cancellation of a task to cancel the job.
func (sm *StateMachine) updateJobOnTaskStatusCanceled(ctx context.Context, logger zerolog.Logger, job *persistence.Job) error { func (sm *StateMachine) updateJobOnTaskStatusCanceled(ctx context.Context, logger zerolog.Logger, job *persistence.Job) error {
// If no more tasks can run, cancel the job. // If no more tasks can run, cancel the job.
numRunnable, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, numRunnable, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job,
api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed, api.TaskStatusPaused) api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed)
if err != nil { if err != nil {
return err return err
} }
@ -181,16 +196,12 @@ func (sm *StateMachine) updateJobOnTaskStatusCanceled(ctx context.Context, logge
} }
// Deal with the special case when the job is in pause-requested status. // Deal with the special case when the job is in pause-requested status.
if job.Status == api.JobStatusPauseRequested { toBePaused, err := sm.shouldJobBePaused(ctx, logger, job)
numActive, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive) if err != nil {
if err != nil { return err
return err }
} if toBePaused {
if numActive == 0 { return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "no more active tasks after task cancellation")
// There is no active task, and the job is in pause-requested status, so we can pause the job.
logger.Info().Msg("No more active tasks, job is paused")
return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "all tasks completed")
}
} }
return nil return nil
@ -218,16 +229,13 @@ func (sm *StateMachine) updateJobOnTaskStatusFailed(ctx context.Context, logger
// If the job didn't fail, this failure indicates that at least the job is active. // If the job didn't fail, this failure indicates that at least the job is active.
failLogger.Info().Msg("task failed, but not enough to fail the job") failLogger.Info().Msg("task failed, but not enough to fail the job")
if job.Status == api.JobStatusPauseRequested { // Deal with the special case when the job is in pause-requested status.
numActive, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive) toBePaused, err := sm.shouldJobBePaused(ctx, logger, job)
if err != nil { if err != nil {
return err return err
} }
if numActive == 0 { if toBePaused {
// There is no active task, and the job is in pause-requested status, so we can pause the job. return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "no more active tasks after task failure")
failLogger.Info().Msg("No more active tasks, job is paused")
return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "all tasks completed")
}
} }
return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive, return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive,
@ -245,16 +253,13 @@ func (sm *StateMachine) updateJobOnTaskStatusCompleted(ctx context.Context, logg
return sm.JobStatusChange(ctx, job, api.JobStatusCompleted, "all tasks completed") return sm.JobStatusChange(ctx, job, api.JobStatusCompleted, "all tasks completed")
} }
if job.Status == api.JobStatusPauseRequested { // Deal with the special case when the job is in pause-requested status.
numActive, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive) toBePaused, err := sm.shouldJobBePaused(ctx, logger, job)
if err != nil { if err != nil {
return err return err
} }
if numActive == 0 { if toBePaused {
// There is no active task, and the job is in pause-requested status, so we can pause the job. return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "no more active tasks after task completion")
logger.Info().Msg("No more active tasks, job is paused")
return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "all tasks completed")
}
} }
logger.Info(). logger.Info().
@ -504,11 +509,11 @@ func (sm *StateMachine) pauseTasks(
} }
// If pausing was requested, it has now happened, so the job can transition. // If pausing was requested, it has now happened, so the job can transition.
numActive, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive) toBePaused, err := sm.shouldJobBePaused(ctx, logger, job)
if err != nil { if err != nil {
return "", fmt.Errorf("error when accessing number of active tasks") return "", fmt.Errorf("error when accessing number of active tasks")
} }
if job.Status == api.JobStatusPauseRequested && numActive == 0 { if toBePaused {
logger.Info().Msg("all tasks of job paused, job can go to 'paused' status") logger.Info().Msg("all tasks of job paused, job can go to 'paused' status")
return api.JobStatusPaused, nil return api.JobStatusPaused, nil
} }

View File

@ -185,7 +185,7 @@ func TestTaskStatusChangeCancelSingleTask(t *testing.T) {
mocks.expectWriteTaskLogTimestamped(t, task, "task changed status active -> canceled") mocks.expectWriteTaskLogTimestamped(t, task, "task changed status active -> canceled")
mocks.expectBroadcastTaskChange(task, api.TaskStatusActive, api.TaskStatusCanceled) mocks.expectBroadcastTaskChange(task, api.TaskStatusActive, api.TaskStatusCanceled)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job,
api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed, api.TaskStatusPaused). api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed).
Return(1, 2, nil) Return(1, 2, nil)
require.NoError(t, sm.TaskStatusChange(ctx, task, api.TaskStatusCanceled)) require.NoError(t, sm.TaskStatusChange(ctx, task, api.TaskStatusCanceled))
@ -194,7 +194,7 @@ func TestTaskStatusChangeCancelSingleTask(t *testing.T) {
mocks.expectWriteTaskLogTimestamped(t, task2, "task changed status queued -> canceled") mocks.expectWriteTaskLogTimestamped(t, task2, "task changed status queued -> canceled")
mocks.expectBroadcastTaskChange(task2, api.TaskStatusQueued, api.TaskStatusCanceled) mocks.expectBroadcastTaskChange(task2, api.TaskStatusQueued, api.TaskStatusCanceled)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job, mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job,
api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed, api.TaskStatusPaused). api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed).
Return(0, 2, nil) Return(0, 2, nil)
mocks.expectSaveJobWithStatus(t, job, api.JobStatusCanceled) mocks.expectSaveJobWithStatus(t, job, api.JobStatusCanceled)
mocks.expectBroadcastJobChange(task.Job, api.JobStatusCancelRequested, api.JobStatusCanceled) mocks.expectBroadcastJobChange(task.Job, api.JobStatusCancelRequested, api.JobStatusCanceled)
@ -340,7 +340,7 @@ func TestJobPauseWithAllQueuedTasks(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t) mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish() defer mockCtrl.Finish()
task1 := taskWithStatus(api.JobStatusActive, api.TaskStatusQueued) task1 := taskWithStatus(api.JobStatusQueued, api.TaskStatusQueued)
task2 := taskOfSameJob(task1, api.TaskStatusQueued) task2 := taskOfSameJob(task1, api.TaskStatusQueued)
task3 := taskOfSameJob(task2, api.TaskStatusQueued) task3 := taskOfSameJob(task2, api.TaskStatusQueued)
job := task3.Job job := task3.Job
@ -348,17 +348,82 @@ func TestJobPauseWithAllQueuedTasks(t *testing.T) {
mocks.expectSaveJobWithStatus(t, job, api.JobStatusPauseRequested) mocks.expectSaveJobWithStatus(t, job, api.JobStatusPauseRequested)
// Expect pausing of the job to trigger pausing of all its queued tasks. // Expect pausing of the job to trigger pausing of all its queued tasks.
mocks.persist.EXPECT().UpdateJobsTaskStatuses(ctx, job, api.TaskStatusPaused, mocks.persist.EXPECT().UpdateJobsTaskStatusesConditional(ctx, job,
"Paused because job transitioned status from \"active\" to \"paused\"") []api.TaskStatus{
api.TaskStatusQueued,
api.TaskStatusSoftFailed,
},
api.TaskStatusPaused,
"Manager paused this task because the job got status \"pause-requested\".",
)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job,
api.TaskStatusActive).
Return(0, 3, nil)
mocks.expectSaveJobWithStatus(t, job, api.JobStatusPaused) mocks.expectSaveJobWithStatus(t, job, api.JobStatusPaused)
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusQueued, api.JobStatusPauseRequested)
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusActive, api.JobStatusPauseRequested)
mocks.expectBroadcastJobChange(job, api.JobStatusPauseRequested, api.JobStatusPaused) mocks.expectBroadcastJobChange(job, api.JobStatusPauseRequested, api.JobStatusPaused)
require.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusPauseRequested, "someone wrote a unittest")) require.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusPauseRequested, "someone wrote a unittest"))
} }
func TestJobPauseWithSomeCompletedTasks(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
task1 := taskWithStatus(api.JobStatusQueued, api.TaskStatusCompleted)
task2 := taskOfSameJob(task1, api.TaskStatusQueued)
task3 := taskOfSameJob(task2, api.TaskStatusQueued)
job := task3.Job
mocks.expectSaveJobWithStatus(t, job, api.JobStatusPauseRequested)
// Expect pausing of the job to trigger pausing of all its queued tasks.
mocks.persist.EXPECT().UpdateJobsTaskStatusesConditional(ctx, job,
[]api.TaskStatus{
api.TaskStatusQueued,
api.TaskStatusSoftFailed,
},
api.TaskStatusPaused,
"Manager paused this task because the job got status \"pause-requested\".",
)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job,
api.TaskStatusActive).
Return(0, 3, nil)
mocks.expectSaveJobWithStatus(t, job, api.JobStatusPaused)
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusQueued, api.JobStatusPauseRequested)
mocks.expectBroadcastJobChange(job, api.JobStatusPauseRequested, api.JobStatusPaused)
require.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusPauseRequested, "someone wrote a unittest"))
}
func TestJobPauseWithSomeActiveTasks(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish()
task1 := taskWithStatus(api.JobStatusActive, api.TaskStatusActive)
task2 := taskOfSameJob(task1, api.TaskStatusCompleted)
task3 := taskOfSameJob(task2, api.TaskStatusQueued)
job := task3.Job
mocks.expectSaveJobWithStatus(t, job, api.JobStatusPauseRequested)
// Expect pausing of the job to trigger pausing of all its queued tasks.
mocks.persist.EXPECT().UpdateJobsTaskStatusesConditional(ctx, job,
[]api.TaskStatus{
api.TaskStatusQueued,
api.TaskStatusSoftFailed,
},
api.TaskStatusPaused,
"Manager paused this task because the job got status \"pause-requested\".",
)
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job,
api.TaskStatusActive).
Return(1, 3, nil)
mocks.expectBroadcastJobChangeWithTaskRefresh(job, api.JobStatusActive, api.JobStatusPauseRequested)
require.NoError(t, sm.JobStatusChange(ctx, job, api.JobStatusPauseRequested, "someone wrote a unittest"))
}
func TestCheckStuck(t *testing.T) { func TestCheckStuck(t *testing.T) {
mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t) mockCtrl, ctx, sm, mocks := taskStateMachineTestFixtures(t)
defer mockCtrl.Finish() defer mockCtrl.Finish()