Support pausing jobs #104313
@ -180,16 +180,22 @@ func (sm *StateMachine) updateJobOnTaskStatusCanceled(ctx context.Context, logge
|
|||||||
return sm.JobStatusChange(ctx, job, api.JobStatusCanceled, "canceled task was last runnable task of job, canceling job")
|
return sm.JobStatusChange(ctx, job, api.JobStatusCanceled, "canceled task was last runnable task of job, canceling job")
|
||||||
}
|
}
|
||||||
|
|
||||||
numActive, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive)
|
// Deal with the special case when the job is in pause-requested status.
|
||||||
if err != nil {
|
if job.Status != api.JobStatusPauseRequested {
|
||||||
return err
|
return nil
|
||||||
}
|
} else {
|
||||||
if numActive == 0 && job.Status == api.JobStatusPauseRequested {
|
numActive, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive)
|
||||||
// there is no active task, and the job is in pause-requested status, so we can pause the job
|
if err != nil {
|
||||||
logger.Info().Msg("all tasks of job are completed, job is paused")
|
return err
|
||||||
return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "all tasks completed")
|
}
|
||||||
|
if numActive == 0 {
|
||||||
|
// there is no active task, and the job is in pause-requested status, so we can pause the job
|
||||||
|
logger.Info().Msg("all tasks of job are completed, job is paused")
|
||||||
|
return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "all tasks completed")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Execution should not reach here.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -191,8 +191,6 @@ func TestTaskStatusChangeCancelSingleTask(t *testing.T) {
|
|||||||
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job,
|
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job,
|
||||||
api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed, api.TaskStatusPaused).
|
api.TaskStatusActive, api.TaskStatusQueued, api.TaskStatusSoftFailed, api.TaskStatusPaused).
|
||||||
Return(1, 2, nil)
|
Return(1, 2, nil)
|
||||||
mocks.persist.EXPECT().CountTasksOfJobInStatus(ctx, job,
|
|
||||||
api.TaskStatusActive).Return(0, 2, nil)
|
|
||||||
require.NoError(t, sm.TaskStatusChange(ctx, task, api.TaskStatusCanceled))
|
require.NoError(t, sm.TaskStatusChange(ctx, task, api.TaskStatusCanceled))
|
||||||
|
|
||||||
// T2: queued > cancelled --> J: cancel-requested > canceled
|
// T2: queued > cancelled --> J: cancel-requested > canceled
|
||||||
|
Loading…
Reference in New Issue
Block a user