WIP: Allow jobs to be submitted in paused status #104318

Closed
David Zhang wants to merge 32 commits from David-Zhang-10/flamenco:submit-as-paused into main

When changing the target branch, be careful to rebase the branch in your fork to match. See documentation.
Showing only changes of commit 050c779995 - Show all commits

View File

@ -5,8 +5,6 @@ package task_state_machine
import ( import (
"context" "context"
"fmt" "fmt"
"projects.blender.org/studio/flamenco/pkg/website"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@ -120,6 +118,7 @@ func (sm *StateMachine) updateJobAfterTaskStatusChange(
return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusCompleted, api.JobStatusRequeueing, "task was queued") return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusCompleted, api.JobStatusRequeueing, "task was queued")
case api.TaskStatusPaused: case api.TaskStatusPaused:
// Pausing a task has no impact on the job.
return nil return nil
case api.TaskStatusCanceled: case api.TaskStatusCanceled:
@ -214,10 +213,19 @@ func (sm *StateMachine) updateJobOnTaskStatusCompleted(ctx context.Context, logg
if err != nil { if err != nil {
return err return err
} }
numActive, _, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusActive)
if err != nil {
return err
}
if numComplete == numTotal { if numComplete == numTotal {
logger.Info().Msg("all tasks of job are completed, job is completed") logger.Info().Msg("all tasks of job are completed, job is completed")
return sm.JobStatusChange(ctx, job, api.JobStatusCompleted, "all tasks completed") return sm.JobStatusChange(ctx, job, api.JobStatusCompleted, "all tasks completed")
} }
if numActive == 0 && job.Status == api.JobStatusPauseRequested {
// there is no active task, and the job is in pause-requested status, so we can pause the job
logger.Info().Msg("all tasks of job are completed, job is paused")
return sm.JobStatusChange(ctx, job, api.JobStatusPaused, "all tasks completed")
}
logger.Info(). logger.Info().
Int("taskNumTotal", numTotal). Int("taskNumTotal", numTotal).
Int("taskNumComplete", numComplete). Int("taskNumComplete", numComplete).
@ -453,7 +461,6 @@ func (sm *StateMachine) pauseTasks(
// Any task that might run in the future should get paused. // Any task that might run in the future should get paused.
// Active jobs should remain active until finished // Active jobs should remain active until finished
taskStatusesToPause := []api.TaskStatus{ taskStatusesToPause := []api.TaskStatus{
api.TaskStatusActive,
api.TaskStatusQueued, api.TaskStatusQueued,
api.TaskStatusCanceled, api.TaskStatusCanceled,
api.TaskStatusSoftFailed, api.TaskStatusSoftFailed,
@ -466,15 +473,7 @@ func (sm *StateMachine) pauseTasks(
return "", fmt.Errorf("pausing tasks of job %s: %w", job.UUID, err) return "", fmt.Errorf("pausing tasks of job %s: %w", job.UUID, err)
} }
// If pause was requested, it has now happened, so the job can transition. return api.JobStatusPauseRequested, nil
if job.Status == api.JobStatusPauseRequested {
logger.Info().Msg("all tasks of job paused, job can go to 'paused' status")
return api.JobStatusPaused, nil
}
// This could mean state transition entered a non-recoverable error state.
log.Warn().Str("jobStatus", string(job.Status)).Msgf("unexpected job status in StateMachine::pauseTasks(), please report this at %s", website.BugReportURL)
return "", fmt.Errorf("unexpected job status %q in StateMachine::pauseTasks()", job.Status)
} }
// requeueTasks re-queues all tasks of the job. // requeueTasks re-queues all tasks of the job.