WIP: 103268-job-task-progress #104185

Draft
Nitin-Rawat-1 wants to merge 19 commits from Nitin-Rawat-1/flamenco:103268-job-task-progress into main

When changing the target branch, be careful to rebase the branch in your fork to match. See documentation.
5 changed files with 121 additions and 3 deletions
Showing only changes of commit 218e476e78 - Show all commits

View File

@ -369,9 +369,57 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error {
return e.JSON(http.StatusOK, customisedTask) return e.JSON(http.StatusOK, customisedTask)
} }
// TODO: Complete the immplementation for the below function // TODO: 1) Broadcast the udpates to the frontend client
// TODO: 2) Write tests for the following function
func (f *Flamenco) TaskProgressUpdate(e echo.Context, taskID string) error { func (f *Flamenco) TaskProgressUpdate(e echo.Context, taskID string) error {
return fmt.Errorf("") logger := requestLogger(e)
worker := requestWorkerOrPanic(e)
if !uuid.IsValid(taskID) {
logger.Debug().Msg("Invalid task ID received")
return sendAPIError(e, http.StatusBadRequest, "Task ID not valid")
}
logger = logger.With().Str("taskID", taskID).Logger()
// Fetch the task, to see if this worker is even allowed to send udpates.
ctx := e.Request().Context()
dbTask, err := f.persist.FetchTask(ctx, taskID)
if err != nil {
logger.Warn().Err(err).Msg("cannot fetch task")
if errors.Is(err, persistence.ErrTaskNotFound) {
return sendAPIError(e, http.StatusNotFound, "task %+v not found", taskID)
}
return sendAPIError(e, http.StatusInternalServerError, "error fetching task")
}
if dbTask == nil {
panic("task could not be fetched, but database gave no error either")
}
// Decode the request body.
var taskProgressUpdate api.TaskProgressUpdate
if err := e.Bind(&taskProgressUpdate); err != nil {
logger.Warn().Err(err).Msg("bad request received")
return sendAPIError(e, http.StatusBadRequest, "invalid format")
}
if dbTask.WorkerID == nil {
logger.Warn().Msg("worker trying to update task that's not assigned to any worker")
return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to any worker, so also not to you", taskID)
}
if *dbTask.WorkerID != worker.ID {
logger.Warn().Msg("worker trying to update task that's assigned to another worker")
return sendAPIError(e, http.StatusConflict, "task %+v is not assigned to you", taskID)
}
// for testing ..............................
fmt.Println("----------------------------")
fmt.Print("Progress: ")
fmt.Println(taskProgressUpdate.Progress)
fmt.Println("----------------------------")
//...........................................
return e.NoContent(http.StatusNoContent)
} }
func (f *Flamenco) TaskOutputProduced(e echo.Context, taskID string) error { func (f *Flamenco) TaskOutputProduced(e echo.Context, taskID string) error {
ctx := e.Request().Context() ctx := e.Request().Context()

View File

@ -7,6 +7,7 @@ package worker
import ( import (
"context" "context"
"fmt" "fmt"
"math"
"os/exec" "os/exec"
"regexp" "regexp"
"sync" "sync"
@ -20,6 +21,10 @@ import (
) )
var regexpFileSaved = regexp.MustCompile("Saved: '(.*)'") var regexpFileSaved = regexp.MustCompile("Saved: '(.*)'")
var regexpFrameNumber = regexp.MustCompile("Fra:[0-9]+")
var prevFrame string

I don't see the need to declare these variables here.

I don't see the need to declare these variables here.
var renderedNumFrames int
var totalNumFrames float64
type BlenderParameters struct { type BlenderParameters struct {
exe string // Expansion of `{blender}`: the executable path defined by the Manager. exe string // Expansion of `{blender}`: the executable path defined by the Manager.
@ -47,6 +52,8 @@ func (ce *CommandExecutor) cmdBlenderRender(ctx context.Context, logger zerolog.
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(1) wg.Add(1)
go func() { go func() {
prevFrame = ""
renderedNumFrames = 0

Why are these set here? They're not used in this goroutine. Also Go initializes variables to the zero value, so it's likely to be a no-op. It can cause a race condition, though, because the goroutine runs asynchronously from the rest of the code.

Why are these set here? They're not used in this goroutine. Also Go initializes variables to the zero value, so it's likely to be a no-op. It can cause a race condition, though, because the goroutine runs asynchronously from the rest of the code.
defer wg.Done() defer wg.Done()
for line := range lineChannel { for line := range lineChannel {
ce.processLineBlender(ctx, logger, taskID, line) ce.processLineBlender(ctx, logger, taskID, line)
@ -141,6 +148,9 @@ func cmdBlenderRenderParams(logger zerolog.Logger, cmd api.Command) (BlenderPara
// Ignore the `ok` return value, as a missing numFrames key is fine: // Ignore the `ok` return value, as a missing numFrames key is fine:
parameters.numFrames, _ = cmdParameter[float64](cmd, "numFrames") parameters.numFrames, _ = cmdParameter[float64](cmd, "numFrames")
if parameters.numFrames != 0 {

This if is not necessary. totalNumFrames is initialized to zero anyway, so if paramters.numFrames == 0, the assignment can safely be done as it just reassigns zero.

This `if` is not necessary. `totalNumFrames` is initialized to zero anyway, so if `paramters.numFrames == 0`, the assignment can safely be done as it just reassigns zero.
totalNumFrames = parameters.numFrames
}
if parameters.argsBefore, ok = cmdParameterAsStrings(cmd, "argsBefore"); !ok { if parameters.argsBefore, ok = cmdParameterAsStrings(cmd, "argsBefore"); !ok {
logger.Warn().Interface("command", cmd).Msg("invalid 'argsBefore' parameter") logger.Warn().Interface("command", cmd).Msg("invalid 'argsBefore' parameter")
@ -174,6 +184,22 @@ func cmdBlenderRenderParams(logger zerolog.Logger, cmd api.Command) (BlenderPara
func (ce *CommandExecutor) processLineBlender(ctx context.Context, logger zerolog.Logger, taskID string, line string) { func (ce *CommandExecutor) processLineBlender(ctx context.Context, logger zerolog.Logger, taskID string, line string) {
// TODO: check for "Warning: Unable to open" and other indicators of missing // TODO: check for "Warning: Unable to open" and other indicators of missing
// files. Flamenco v2 updated the task.Activity field for such situations. // files. Flamenco v2 updated the task.Activity field for such situations.
renderedFrameNumber := regexpFrameNumber.FindString(line)
if renderedFrameNumber != "" && renderedFrameNumber != prevFrame {
renderedNumFrames++
prevFrame = renderedFrameNumber
var progress = int(math.Ceil(float64(renderedNumFrames) / totalNumFrames * 100))
// for checking the out of progress
fmt.Println("---------------------------")
fmt.Println(prevFrame)
fmt.Println(progress)
fmt.Println("---------------------------")
err := ce.listener.UpdateTaskProgress(ctx, taskID, progress)
if err != nil {
logger.Warn().Err(err).Msg("error send progress udpate to manager.")
}
}
match := regexpFileSaved.FindStringSubmatch(line) match := regexpFileSaved.FindStringSubmatch(line)
if len(match) < 2 { if len(match) < 2 {

View File

@ -39,6 +39,8 @@ type CommandListener interface {
LogProduced(ctx context.Context, taskID string, logLines ...string) error LogProduced(ctx context.Context, taskID string, logLines ...string) error
// OutputProduced tells the Manager there has been some output (most commonly a rendered frame or video). // OutputProduced tells the Manager there has been some output (most commonly a rendered frame or video).
OutputProduced(ctx context.Context, taskID string, outputLocation string) error OutputProduced(ctx context.Context, taskID string, outputLocation string) error
// UpdateTaskProgress sends the progress update of the task to manager
UpdateTaskProgress(ctx context.Context, taskID string, progress int) error
} }
// TimeService is a service that operates on time. // TimeService is a service that operates on time.
@ -47,8 +49,9 @@ type TimeService interface {
Now() time.Time Now() time.Time
} }
//go:generate go run github.com/golang/mock/mockgen -destination mocks/cli_runner.gen.go -package mocks git.blender.org/flamenco/internal/worker CommandLineRunner
// CommandLineRunner is an interface around exec.CommandContext(). // CommandLineRunner is an interface around exec.CommandContext().
//
//go:generate go run github.com/golang/mock/mockgen -destination mocks/cli_runner.gen.go -package mocks git.blender.org/flamenco/internal/worker CommandLineRunner
type CommandLineRunner interface { type CommandLineRunner interface {
CommandContext(ctx context.Context, name string, arg ...string) *exec.Cmd CommandContext(ctx context.Context, name string, arg ...string) *exec.Cmd
RunWithTextOutput( RunWithTextOutput(

View File

@ -6,6 +6,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"net/http"
"strings" "strings"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@ -91,9 +92,35 @@ func (l *Listener) OutputProduced(ctx context.Context, taskID string, outputLoca
return nil return nil
} }
func (l *Listener) UpdateTaskProgress(ctx context.Context, taskID string, progress int) error {
return l.sendProgressUpdate(ctx, taskID, api.TaskProgressUpdateJSONRequestBody{
Progress: progress,
})
}
func (l *Listener) sendTaskUpdate(ctx context.Context, taskID string, update api.TaskUpdateJSONRequestBody) error { func (l *Listener) sendTaskUpdate(ctx context.Context, taskID string, update api.TaskUpdateJSONRequestBody) error {
if ctx.Err() != nil { if ctx.Err() != nil {
return ctx.Err() return ctx.Err()
} }
return l.buffer.SendTaskUpdate(ctx, taskID, update) return l.buffer.SendTaskUpdate(ctx, taskID, update)
} }
func (l *Listener) sendProgressUpdate(ctx context.Context, taskID string, progress api.TaskProgressUpdateJSONRequestBody) error {
if ctx.Err() != nil {
return ctx.Err()
}
resp, err := l.client.TaskProgressUpdateWithResponse(ctx, taskID, progress)

This should not use the API client directly. If the Manager cannot be reached, the progress updates should be queued in the buffer, just like regular task updates.

This should not use the API client directly. If the Manager cannot be reached, the progress updates should be queued in the buffer, just like regular task updates.
if err != nil {
log.Warn().Err(err).Str("task", taskID).Msg("Error communicating with the Manager, unable to send progress update")
return fmt.Errorf("%v", err)
}
switch resp.StatusCode() {
case http.StatusNoContent:
return nil
default:
return fmt.Errorf("unknown error from Manager, code %d: %v",
resp.StatusCode(), resp.JSONDefault)
}
}

View File

@ -66,3 +66,17 @@ func (mr *MockCommandListenerMockRecorder) OutputProduced(arg0, arg1, arg2 inter
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OutputProduced", reflect.TypeOf((*MockCommandListener)(nil).OutputProduced), arg0, arg1, arg2) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OutputProduced", reflect.TypeOf((*MockCommandListener)(nil).OutputProduced), arg0, arg1, arg2)
} }
// UpdateTaskProgress mocks base method.
func (m *MockCommandListener) UpdateTaskProgress(arg0 context.Context, arg1 string, arg2 int) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UpdateTaskProgress", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// UpdateTaskProgress indicates an expected call of UpdateTaskProgress.
func (mr *MockCommandListenerMockRecorder) UpdateTaskProgress(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTaskProgress", reflect.TypeOf((*MockCommandListener)(nil).UpdateTaskProgress), arg0, arg1, arg2)
}