Support pausing jobs #104313
@ -189,13 +189,12 @@ func (sm *StateMachine) updateJobOnTaskStatusCanceled(ctx context.Context, logge
|
||||
return err
|
||||
}
|
||||
if numActive == 0 {
|
||||
// there is no active task, and the job is in pause-requested status, so we can pause the job
|
||||
logger.Info().Msg("all tasks of job are completed, job is paused")
|
||||
// There is no active task, and the job is in pause-requested status, so we can pause the job.
|
||||
logger.Info().Msg("No more active tasks, job is paused")
|
||||
return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "all tasks completed")
|
||||
}
|
||||
}
|
||||
|
||||
// Execution should not reach here.
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -221,15 +220,21 @@ func (sm *StateMachine) updateJobOnTaskStatusFailed(ctx context.Context, logger
|
||||
// If the job didn't fail, this failure indicates that at least the job is active.
|
||||
failLogger.Info().Msg("task failed, but not enough to fail the job")
|
||||
|
||||
numActive, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if numActive == 0 && job.Status == api.JobStatusPauseRequested {
|
||||
// there is no active task, and the job is in pause-requested status, so we can pause the job
|
||||
logger.Info().Msg("all tasks of job are completed, job is paused")
|
||||
return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "all tasks completed")
|
||||
if job.Status != api.JobStatusPauseRequested {
|
||||
return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive,
|
||||
"task failed, but not enough to fail the job")
|
||||
} else {
|
||||
David-Zhang-10 marked this conversation as resolved
Outdated
|
||||
numActive, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if numActive == 0 {
|
||||
// There is no active task, and the job is in pause-requested status, so we can pause the job.
|
||||
failLogger.Info().Msg("No more active tasks, job is paused")
|
||||
return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "all tasks completed")
|
||||
}
|
||||
}
|
||||
|
||||
return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive,
|
||||
"task failed, but not enough to fail the job")
|
||||
}
|
||||
@ -240,19 +245,29 @@ func (sm *StateMachine) updateJobOnTaskStatusCompleted(ctx context.Context, logg
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
numActive, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if numComplete == numTotal {
|
||||
logger.Info().Msg("all tasks of job are completed, job is completed")
|
||||
return sm.JobStatusChange(ctx, job, api.JobStatusCompleted, "all tasks completed")
|
||||
}
|
||||
if numActive == 0 && job.Status == api.JobStatusPauseRequested {
|
||||
// there is no active task, and the job is in pause-requested status, so we can pause the job
|
||||
logger.Info().Msg("all tasks of job are completed, job is paused")
|
||||
return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "all tasks completed")
|
||||
|
||||
if job.Status != api.JobStatusPauseRequested {
|
||||
logger.Info().
|
||||
Int("taskNumTotal", numTotal).
|
||||
Int("taskNumComplete", numComplete).
|
||||
Msg("task completed; there are more tasks to do")
|
||||
return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive, "no more tasks to do")
|
||||
} else {
|
||||
numActive, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if numActive == 0 {
|
||||
// There is no active task, and the job is in pause-requested status, so we can pause the job.
|
||||
logger.Info().Msg("No more active tasks, job is paused")
|
||||
return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "all tasks completed")
|
||||
}
|
||||
}
|
||||
|
||||
logger.Info().
|
||||
Int("taskNumTotal", numTotal).
|
||||
Int("taskNumComplete", numComplete).
|
||||
|
@ -76,7 +76,6 @@ func TestTaskStatusChangeActiveToCompleted(t *testing.T) {
|
||||
mocks.expectWriteTaskLogTimestamped(t, task, "task changed status active -> completed")
|
||||
mocks.expectBroadcastTaskChange(task, api.TaskStatusActive, api.TaskStatusCompleted)
|
||||
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, task.Job, api.TaskStatusCompleted).Return(1, 3, nil) // 1 of 3 complete.
|
||||
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, task.Job, api.TaskStatusActive).Return(2, 3, nil) // 2 of 3 active.
|
||||
require.NoError(t, sm.TaskStatusChange(ctx, task, api.TaskStatusCompleted))
|
||||
|
||||
// Second task hickup: T: active > soft-failed --> J: active > active
|
||||
@ -90,7 +89,6 @@ func TestTaskStatusChangeActiveToCompleted(t *testing.T) {
|
||||
mocks.expectWriteTaskLogTimestamped(t, task2, "task changed status soft-failed -> completed")
|
||||
mocks.expectBroadcastTaskChange(task2, api.TaskStatusSoftFailed, api.TaskStatusCompleted)
|
||||
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, task.Job, api.TaskStatusCompleted).Return(2, 3, nil) // 2 of 3 complete.
|
||||
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, task.Job, api.TaskStatusActive).Return(1, 3, nil) // 1 of 3 active.
|
||||
require.NoError(t, sm.TaskStatusChange(ctx, task2, api.TaskStatusCompleted))
|
||||
|
||||
// Third task completing: T: active > completed --> J: active > completed
|
||||
@ -98,7 +96,6 @@ func TestTaskStatusChangeActiveToCompleted(t *testing.T) {
|
||||
mocks.expectWriteTaskLogTimestamped(t, task3, "task changed status active -> completed")
|
||||
mocks.expectBroadcastTaskChange(task3, api.TaskStatusActive, api.TaskStatusCompleted)
|
||||
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, task.Job, api.TaskStatusCompleted).Return(3, 3, nil) // 3 of 3 complete.
|
||||
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, task.Job, api.TaskStatusActive).Return(0, 3, nil) // 0 of 3 active.
|
||||
mocks.expectSaveJobWithStatus(t, task.Job, api.JobStatusCompleted)
|
||||
mocks.expectBroadcastJobChange(task.Job, api.JobStatusActive, api.JobStatusCompleted)
|
||||
|
||||
@ -116,7 +113,6 @@ func TestTaskStatusChangeQueuedToFailed(t *testing.T) {
|
||||
mocks.expectBroadcastTaskChange(task, api.TaskStatusQueued, api.TaskStatusFailed)
|
||||
mocks.expectSaveJobWithStatus(t, task.Job, api.JobStatusActive)
|
||||
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, task.Job, api.TaskStatusFailed).Return(1, 100, nil) // 1 out of 100 failed.
|
||||
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, task.Job, api.TaskStatusActive).Return(0, 100, nil) // 0 out of 100 active.
|
||||
mocks.expectBroadcastJobChange(task.Job, api.JobStatusQueued, api.JobStatusActive)
|
||||
|
||||
require.NoError(t, sm.TaskStatusChange(ctx, task, api.TaskStatusFailed))
|
||||
|
Loading…
Reference in New Issue
Block a user
I don't think this is the right structure. The
if {}
block now contains a copy of the code below it, which is error-prone. There should be one piece of code responsible for one thing. By duplicating this code, there is the chance that one of the copies goes out of sync with the other, introducing hard to reason about bugs.Also, don't use
else
after areturn
, it doesn't mean anything.Better to just use:
It'll also make it easier to move the code into a function of its own. It's the same "single responsibility principle" as before: there should be one piece of code that checks whether pausing a job is complete.