Distributed rendering of single images #104327

Merged
David Zhang merged 22 commits from David-Zhang-10/flamenco:single-image-render into main 2024-09-03 06:47:49 +02:00
28 changed files with 542 additions and 123 deletions
Showing only changes of commit 051210dc45 - Show all commits

View File

@ -11,6 +11,8 @@ bugs in actually-released versions.
- Make it possible to script job submissions in Blender, by executing the `bpy.ops.flamenco.submit_job(job_name="jobname")` operator.
- Security updates of some deendencies:
- [GO-2024-2937: Parsing a corrupt or malicious image with invalid color indices can cause a panic](https://pkg.go.dev/vuln/GO-2024-2937)
- Web interface: list the job's worker tag in the job details.
- Ensure the submitted scene is rendered in a multi-scene blend file.
## 3.5 - released 2024-04-16

View File

@ -32,6 +32,7 @@ MAX_FAILED_PATHS = 8
HashableShamanFileSpec = tuple[str, int, str]
"""Tuple of the 'sha', 'size', and 'path' fields of a ShamanFileSpec."""
# Mypy doesn't understand that submodules.pack.Packer exists.
class Packer(submodules.pack.Packer): # type: ignore
"""Creates BAT Packs on a Shaman server."""

View File

@ -42,7 +42,9 @@ class FLAMENCO_PT_job_submission(bpy.types.Panel):
col = layout.column(align=True)
col.prop(context.scene, "flamenco_job_name", text="Job Name")
col.prop(context.scene, "flamenco_job_priority", text="Priority")
col.prop(context.scene, "flamenco_job_submit_as_paused", text="Submit as Paused")
col.prop(
context.scene, "flamenco_job_submit_as_paused", text="Submit as Paused"
)
# Refreshables:
col = layout.column(align=True)
@ -51,7 +53,7 @@ class FLAMENCO_PT_job_submission(bpy.types.Panel):
)
if not job_types.are_job_types_available():
return
col.prop(context.scene, "flamenco_worker_tag", text="Tag")
col.prop(context.scene, "flamenco_worker_tag", text="Worker Tag")
# Job properties:
job_col = layout.column(align=True)

View File

@ -335,7 +335,14 @@ class FLAMENCO_OT_submit_job(FlamencoOpMixin, bpy.types.Operator):
)
prefs.experimental.use_all_linked_data_direct = True
filepath = Path(context.blend_data.filepath).with_suffix(".flamenco.blend")
filepath = Path(context.blend_data.filepath)
if job_submission.is_file_inside_job_storage(context, filepath):
self.log.info(
"Saving blendfile, already in shared storage: %s", filepath
)
bpy.ops.wm.save_as_mainfile()
else:
filepath = filepath.with_suffix(".flamenco.blend")
self.log.info("Saving copy to temporary file %s", filepath)
bpy.ops.wm.save_as_mainfile(
filepath=str(filepath), compress=True, copy=True

View File

@ -148,7 +148,13 @@ func runFlamencoManager() bool {
log.Fatal().Err(err).Msg("unable to figure out my own URL")
}
ssdp := makeAutoDiscoverable(urls)
// Construct the UPnP/SSDP server.
var ssdp *upnp_ssdp.Server
if configService.Get().SSDPDiscovery {
ssdp = makeAutoDiscoverable(urls)
} else {
log.Debug().Msg("UPnP/SSDP autodiscovery disabled in configuration")
}
// Construct the services.
persist := openDB(*configService)

View File

@ -89,6 +89,56 @@ func TestQueryJobs(t *testing.T) {
assertResponseJSON(t, echoCtx, http.StatusOK, expectedJobs)
}
func TestFetchJob(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mf := newMockedFlamenco(mockCtrl)
dbJob := persistence.Job{
UUID: "afc47568-bd9d-4368-8016-e91d945db36d",
Name: "работа",
JobType: "test",
Priority: 50,
Status: api.JobStatusActive,
Settings: persistence.StringInterfaceMap{
"result": "/render/frames/exploding.kittens",
},
Metadata: persistence.StringStringMap{
"project": "/projects/exploding-kittens",
},
WorkerTag: &persistence.WorkerTag{
UUID: "d86e1b84-5ee2-4784-a178-65963eeb484b",
Name: "Tikkie terug Kees!",
Description: "",
},
}
echoCtx := mf.prepareMockedRequest(nil)
mf.persistence.EXPECT().FetchJob(gomock.Any(), dbJob.UUID).Return(&dbJob, nil)
require.NoError(t, mf.flamenco.FetchJob(echoCtx, dbJob.UUID))
expectedJob := api.Job{
SubmittedJob: api.SubmittedJob{
Name: "работа",
Type: "test",
Priority: 50,
Settings: &api.JobSettings{AdditionalProperties: map[string]interface{}{
"result": "/render/frames/exploding.kittens",
}},
Metadata: &api.JobMetadata{AdditionalProperties: map[string]string{
"project": "/projects/exploding-kittens",
}},
WorkerTag: ptr("d86e1b84-5ee2-4784-a178-65963eeb484b"),
},
Id: "afc47568-bd9d-4368-8016-e91d945db36d",
Status: api.JobStatusActive,
}
assertResponseJSON(t, echoCtx, http.StatusOK, expectedJob)
}
func TestFetchTask(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

View File

@ -243,6 +243,85 @@ func TestReplaceTwoWayVariables(t *testing.T) {
}
}
// TestReplaceTwoWayVariablesFFmpegExpression tests that slashes (for division)
// in an FFmpeg filter expression are NOT replaced with backslashes when sending
// to a Windows worker.
func TestReplaceTwoWayVariablesFFmpegExpression(t *testing.T) {
c := config.DefaultConfig(func(c *config.Conf) {
// Mock that the Manager is running Linux.
c.MockCurrentGOOSForTests("linux")
// Trigger a translation of a path in the FFmpeg command arguments.
c.Variables["project"] = config.Variable{
IsTwoWay: true,
Values: []config.VariableValue{
{Value: "/projects/charge", Platform: config.VariablePlatformLinux, Audience: config.VariableAudienceAll},
{Value: `P:\charge`, Platform: config.VariablePlatformWindows, Audience: config.VariableAudienceAll},
},
}
})
task := api.AssignedTask{
Job: "f0bde4d0-eaaf-4ee0-976b-802a86aa2d02",
JobPriority: 50,
JobType: "simple-blender-render",
Name: "preview-video",
Priority: 50,
Status: api.TaskStatusQueued,
TaskType: "ffmpeg",
Uuid: "fd963a82-2e98-4a39-9bd4-c302e5b8814f",
Commands: []api.Command{
{
Name: "frames-to-video",
Parameters: map[string]interface{}{
"exe": "ffmpeg", // Should not change.
"fps": 24, // Should not change type.
"inputGlob": "/projects/charge/renders/*.webp", // Path, should change.
"outputFile": "/projects/charge/renders/video.mp4", // Path, should change.
"args": []string{
"-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2", // Should not change.
"-fake-lut", `/projects/charge/ffmpeg.lut`, // Path, should change.
},
},
},
},
}
worker := persistence.Worker{Platform: "windows"}
replacedTask := replaceTaskVariables(&c, task, worker)
expectTask := api.AssignedTask{
Job: "f0bde4d0-eaaf-4ee0-976b-802a86aa2d02",
JobPriority: 50,
JobType: "simple-blender-render",
Name: "preview-video",
Priority: 50,
Status: api.TaskStatusQueued,
TaskType: "ffmpeg",
Uuid: "fd963a82-2e98-4a39-9bd4-c302e5b8814f",
Commands: []api.Command{
{
Name: "frames-to-video",
Parameters: map[string]interface{}{
"exe": "ffmpeg",
"fps": 24,
// These two parameters matched a two-way variable:
"inputGlob": `P:\charge\renders\*.webp`,
"outputFile": `P:\charge\renders\video.mp4`,
"args": []string{
// This parameter should not change:
"-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2",
// This parameter should change:
"-fake-lut", `P:\charge\ffmpeg.lut`,
},
},
},
},
}
assert.Equal(t, expectTask, replacedTask)
}
func varReplSubmittedJob() api.SubmittedJob {
return api.SubmittedJob{
Type: "simple-blender-render",
@ -273,7 +352,7 @@ func varReplSubmittedJob() api.SubmittedJob {
}
// jsonWash converts the given value to JSON and back.
// This makes sure the types are as closed to what the API will handle as
// This makes sure the types are as close to what the API will handle as
// possible, making the difference between "array of strings" and "array of
// interface{}s that happen to be strings".
func jsonWash[T any](value T) T {

View File

@ -313,8 +313,11 @@ func (f *Flamenco) FetchWorkerTag(e echo.Context, tagUUID string) error {
logger.Error().Err(err).Msg("fetching worker tag")
return sendAPIError(e, http.StatusInternalServerError, "error fetching worker tag: %v", err)
}
if tag == nil {
panic("Could fetch a worker tag without error, but then the returned tag was still nil")
}
return e.JSON(http.StatusOK, workerTagDBtoAPI(*tag))
return e.JSON(http.StatusOK, workerTagDBtoAPI(tag))
}
func (f *Flamenco) UpdateWorkerTag(e echo.Context, tagUUID string) error {
@ -387,8 +390,8 @@ func (f *Flamenco) FetchWorkerTags(e echo.Context) error {
apiTags := []api.WorkerTag{}
for _, dbTag := range dbTags {
apiTag := workerTagDBtoAPI(*dbTag)
apiTags = append(apiTags, apiTag)
apiTag := workerTagDBtoAPI(dbTag)
apiTags = append(apiTags, *apiTag)
}
tagList := api.WorkerTagList{
@ -443,7 +446,7 @@ func (f *Flamenco) CreateWorkerTag(e echo.Context) error {
sioUpdate := eventbus.NewWorkerTagUpdate(&dbTag)
f.broadcaster.BroadcastNewWorkerTag(sioUpdate)
return e.JSON(http.StatusOK, workerTagDBtoAPI(dbTag))
return e.JSON(http.StatusOK, workerTagDBtoAPI(&dbTag))
}
func workerSummary(w persistence.Worker) api.WorkerSummary {
@ -479,7 +482,7 @@ func workerDBtoAPI(w persistence.Worker) api.Worker {
if len(w.Tags) > 0 {
tags := []api.WorkerTag{}
for i := range w.Tags {
tags = append(tags, workerTagDBtoAPI(*w.Tags[i]))
tags = append(tags, *workerTagDBtoAPI(w.Tags[i]))
}
apiWorker.Tags = &tags
}
@ -487,7 +490,11 @@ func workerDBtoAPI(w persistence.Worker) api.Worker {
return apiWorker
}
func workerTagDBtoAPI(wc persistence.WorkerTag) api.WorkerTag {
func workerTagDBtoAPI(wc *persistence.WorkerTag) *api.WorkerTag {
if wc == nil {
return nil
}
uuid := wc.UUID // Take a copy for safety.
apiTag := api.WorkerTag{
@ -497,5 +504,5 @@ func workerTagDBtoAPI(wc persistence.WorkerTag) api.WorkerTag {
if len(wc.Description) > 0 {
apiTag.Description = &wc.Description
}
return apiTag
return &apiTag
}

View File

@ -106,6 +106,10 @@ func (ve *VariableExpander) Expand(valueToExpand string) string {
isPathValue := false
for varname, varvalue := range ve.targetTwoWayVars {
placeholder := fmt.Sprintf("{%s}", varname)
if !strings.Contains(expanded, placeholder) {
continue
}
expanded = strings.Replace(expanded, placeholder, varvalue, -1)
// Since two-way variables are meant for path replacement, we know this

View File

@ -139,6 +139,41 @@ func TestSimpleBlenderRenderHappy(t *testing.T) {
assert.Equal(t, expectDeps, tVideo.Dependencies)
}
func TestSimpleBlenderRenderWithScene(t *testing.T) {
c := mockedClock(t)
s, err := Load(c)
require.NoError(t, err)
// Compiling a job should be really fast.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
sj := exampleSubmittedJob()
sj.Settings.AdditionalProperties["scene"] = "Test Scene"
aj, err := s.Compile(ctx, sj)
require.NoError(t, err)
require.NotNil(t, aj)
t0 := aj.Tasks[0]
expectCliArgs := []interface{}{ // They are strings, but Goja doesn't know that and will produce an []interface{}.
"--scene", "Test Scene",
"--render-output", "/render/sprites/farm_output/promo/square_ellie/square_ellie.lighting_light_breakdown2/######",
"--render-format", "PNG",
"--render-frame", "1..3",
}
assert.Equal(t, "render-1-3", t0.Name)
assert.Equal(t, 1, len(t0.Commands))
assert.Equal(t, "blender-render", t0.Commands[0].Name)
assert.EqualValues(t, AuthoredCommandParameters{
"exe": "{blender}",
"exeArgs": "{blenderArgs}",
"blendfile": "/render/sf/jobs/scene123.blend",
"args": expectCliArgs,
"argsBefore": make([]interface{}, 0),
}, t0.Commands[0].Parameters)
}
func TestJobWithoutTag(t *testing.T) {
c := mockedClock(t)

View File

@ -32,6 +32,8 @@ const JOB_TYPE = {
description: "File extension used when rendering images" },
{ key: "has_previews", type: "bool", required: false, eval: "C.scene.render.image_settings.use_preview", visible: "hidden",
description: "Whether Blender will render preview images."},
{ key: "scene", type: "string", required: true, eval: "C.scene.name", visible: "web",
description: "Name of the scene to render."},
]
};
@ -100,6 +102,12 @@ function authorRenderTasks(settings, renderDir, renderOutput) {
print("authorRenderTasks(", renderDir, renderOutput, ")");
let renderTasks = [];
let chunks = frameChunker(settings.frames, settings.chunk_size);
let baseArgs = [];
if (settings.scene) {
baseArgs = baseArgs.concat(["--scene", settings.scene]);
}
for (let chunk of chunks) {
const task = author.Task(`render-${chunk}`, "blender");
const command = author.Command("blender-render", {
@ -107,11 +115,11 @@ function authorRenderTasks(settings, renderDir, renderOutput) {
exeArgs: "{blenderArgs}",
argsBefore: [],
blendfile: settings.blendfile,
args: [
args: baseArgs.concat([
"--render-output", path.join(renderDir, path.basename(renderOutput)),
"--render-format", settings.format,
"--render-frame", chunk.replaceAll("-", ".."), // Convert to Blender frame range notation.
]
])
});
task.addCommand(command);
renderTasks.push(task);

View File

@ -359,6 +359,18 @@ func (db *DB) FetchJob(ctx context.Context, jobUUID string) (*Job, error) {
if err != nil {
return nil, err
}
if sqlcJob.WorkerTagID.Valid {
workerTag, err := fetchWorkerTagByID(db.gormDB, uint(sqlcJob.WorkerTagID.Int64))
switch {
case errors.Is(err, sql.ErrNoRows):
return nil, ErrWorkerTagNotFound
case err != nil:
return nil, workerTagError(err, "fetching worker tag of job")
}
gormJob.WorkerTag = workerTag
}
return &gormJob, nil
}

View File

@ -7,7 +7,7 @@ import (
"math"
"time"
"gorm.io/gorm/clause"
"projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc"
)
// JobBlock keeps track of which Worker is not allowed to run which task type on which job.
@ -28,66 +28,76 @@ type JobBlock struct {
// AddWorkerToJobBlocklist prevents this Worker of getting any task, of this type, on this job, from the task scheduler.
func (db *DB) AddWorkerToJobBlocklist(ctx context.Context, job *Job, worker *Worker, taskType string) error {
entry := JobBlock{
Job: job,
Worker: worker,
if job.ID == 0 {
panic("Cannot add worker to job blocklist with zero job ID")
}
if worker.ID == 0 {
panic("Cannot add worker to job blocklist with zero worker ID")
}
if taskType == "" {
panic("Cannot add worker to job blocklist with empty task type")
}
queries, err := db.queries()
if err != nil {
return err
}
return queries.AddWorkerToJobBlocklist(ctx, sqlc.AddWorkerToJobBlocklistParams{
CreatedAt: db.now().Time,
JobID: int64(job.ID),
WorkerID: int64(worker.ID),
TaskType: taskType,
}
tx := db.gormDB.WithContext(ctx).
Clauses(clause.OnConflict{DoNothing: true}).
Create(&entry)
return tx.Error
})
}
// FetchJobBlocklist fetches the blocklist for the given job.
// Workers are fetched too, and embedded in the returned list.
func (db *DB) FetchJobBlocklist(ctx context.Context, jobUUID string) ([]JobBlock, error) {
entries := []JobBlock{}
queries, err := db.queries()
if err != nil {
return nil, err
}
tx := db.gormDB.WithContext(ctx).
Model(JobBlock{}).
Joins("inner join jobs on jobs.id = job_blocks.job_id").
Joins("Worker").
Where("jobs.uuid = ?", jobUUID).
Order("Worker.name").
Scan(&entries)
return entries, tx.Error
rows, err := queries.FetchJobBlocklist(ctx, jobUUID)
if err != nil {
return nil, err
}
entries := make([]JobBlock, len(rows))
for idx, row := range rows {
entries[idx].ID = uint(row.JobBlock.ID)
entries[idx].CreatedAt = row.JobBlock.CreatedAt
entries[idx].TaskType = row.JobBlock.TaskType
entries[idx].JobID = uint(row.JobBlock.JobID)
entries[idx].WorkerID = uint(row.JobBlock.WorkerID)
worker := convertSqlcWorker(row.Worker)
entries[idx].Worker = &worker
}
return entries, nil
}
// ClearJobBlocklist removes the entire blocklist of this job.
func (db *DB) ClearJobBlocklist(ctx context.Context, job *Job) error {
tx := db.gormDB.WithContext(ctx).
Where("job_id = ?", job.ID).
Delete(JobBlock{})
return tx.Error
queries, err := db.queries()
if err != nil {
return err
}
return queries.ClearJobBlocklist(ctx, job.UUID)
}
func (db *DB) RemoveFromJobBlocklist(ctx context.Context, jobUUID, workerUUID, taskType string) error {
// Find the job ID.
job := Job{}
tx := db.gormDB.WithContext(ctx).
Select("id").
Where("uuid = ?", jobUUID).
Find(&job)
if tx.Error != nil {
return jobError(tx.Error, "fetching job with uuid=%q", jobUUID)
queries, err := db.queries()
if err != nil {
return err
}
// Find the worker ID.
worker := Worker{}
tx = db.gormDB.WithContext(ctx).
Select("id").
Where("uuid = ?", workerUUID).
Find(&worker)
if tx.Error != nil {
return workerError(tx.Error, "fetching worker with uuid=%q", workerUUID)
}
// Remove the blocklist entry.
tx = db.gormDB.WithContext(ctx).
Where("job_id = ?", job.ID).
Where("worker_id = ?", worker.ID).
Where("task_type = ?", taskType).
Delete(JobBlock{})
return tx.Error
return queries.RemoveFromJobBlocklist(ctx, sqlc.RemoveFromJobBlocklistParams{
JobUUID: jobUUID,
WorkerUUID: workerUUID,
TaskType: taskType,
})
}
// WorkersLeftToRun returns a set of worker UUIDs that can run tasks of the given type on the given job.

View File

@ -75,6 +75,38 @@ func TestStoreAuthoredJobWithShamanCheckoutID(t *testing.T) {
assert.Equal(t, job.Storage.ShamanCheckoutID, fetchedJob.Storage.ShamanCheckoutID)
}
func TestStoreAuthoredJobWithWorkerTag(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(1 * time.Second)
defer cancel()
workerTagUUID := "daa811ac-6861-4004-8748-7700aebc244c"
require.NoError(t, db.CreateWorkerTag(ctx, &WorkerTag{
UUID: workerTagUUID,
Name: "🐈",
Description: "Mrieuw",
}))
workerTag, err := db.FetchWorkerTag(ctx, workerTagUUID)
require.NoError(t, err)
job := createTestAuthoredJobWithTasks()
job.WorkerTagUUID = workerTagUUID
err = db.StoreAuthoredJob(ctx, job)
require.NoError(t, err)
fetchedJob, err := db.FetchJob(ctx, job.JobID)
require.NoError(t, err)
require.NotNil(t, fetchedJob)
require.NotNil(t, fetchedJob.WorkerTagID)
assert.Equal(t, *fetchedJob.WorkerTagID, workerTag.ID)
require.NotNil(t, fetchedJob.WorkerTag)
assert.Equal(t, fetchedJob.WorkerTag.Name, workerTag.Name)
assert.Equal(t, fetchedJob.WorkerTag.Description, workerTag.Description)
assert.Equal(t, fetchedJob.WorkerTag.UUID, workerTagUUID)
}
func TestFetchTaskJobUUID(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(1 * time.Second)
defer cancel()

View File

@ -244,3 +244,28 @@ ON CONFLICT DO UPDATE
-- name: GetLastRenderedJobUUID :one
SELECT uuid FROM jobs
INNER JOIN last_rendereds LR ON jobs.id = LR.job_id;
-- name: AddWorkerToJobBlocklist :exec
-- Add a worker to a job's blocklist.
INSERT INTO job_blocks (created_at, job_id, worker_id, task_type)
VALUES (@created_at, @job_id, @worker_id, @task_type)
ON CONFLICT DO NOTHING;
-- name: FetchJobBlocklist :many
SELECT sqlc.embed(job_blocks), sqlc.embed(workers)
FROM job_blocks
INNER JOIN jobs ON jobs.id = job_blocks.job_id
INNER JOIN workers on workers.id = job_blocks.worker_id
WHERE jobs.uuid = @jobuuid
ORDER BY workers.name;
-- name: ClearJobBlocklist :exec
DELETE FROM job_blocks
WHERE job_id in (SELECT jobs.id FROM jobs WHERE jobs.uuid=@jobuuid);
-- name: RemoveFromJobBlocklist :exec
DELETE FROM job_blocks
WHERE
job_blocks.job_id in (SELECT jobs.id FROM jobs WHERE jobs.uuid=@jobuuid)
AND job_blocks.worker_id in (SELECT workers.id FROM workers WHERE workers.uuid=@workeruuid)
AND job_blocks.task_type = @task_type;

View File

@ -13,6 +13,30 @@ import (
"time"
)
const addWorkerToJobBlocklist = `-- name: AddWorkerToJobBlocklist :exec
INSERT INTO job_blocks (created_at, job_id, worker_id, task_type)
VALUES (?1, ?2, ?3, ?4)
ON CONFLICT DO NOTHING
`
type AddWorkerToJobBlocklistParams struct {
CreatedAt time.Time
JobID int64
WorkerID int64
TaskType string
}
// Add a worker to a job's blocklist.
func (q *Queries) AddWorkerToJobBlocklist(ctx context.Context, arg AddWorkerToJobBlocklistParams) error {
_, err := q.db.ExecContext(ctx, addWorkerToJobBlocklist,
arg.CreatedAt,
arg.JobID,
arg.WorkerID,
arg.TaskType,
)
return err
}
const addWorkerToTaskFailedList = `-- name: AddWorkerToTaskFailedList :exec
INSERT INTO task_failures (created_at, task_id, worker_id)
VALUES (?1, ?2, ?3)
@ -50,6 +74,16 @@ func (q *Queries) ClearFailureListOfTask(ctx context.Context, taskID int64) erro
return err
}
const clearJobBlocklist = `-- name: ClearJobBlocklist :exec
DELETE FROM job_blocks
WHERE job_id in (SELECT jobs.id FROM jobs WHERE jobs.uuid=?1)
`
func (q *Queries) ClearJobBlocklist(ctx context.Context, jobuuid string) error {
_, err := q.db.ExecContext(ctx, clearJobBlocklist, jobuuid)
return err
}
const countWorkersFailingTask = `-- name: CountWorkersFailingTask :one
SELECT count(*) as num_failed FROM task_failures
WHERE task_id=?1
@ -217,6 +251,65 @@ func (q *Queries) FetchJob(ctx context.Context, uuid string) (Job, error) {
return i, err
}
const fetchJobBlocklist = `-- name: FetchJobBlocklist :many
SELECT job_blocks.id, job_blocks.created_at, job_blocks.job_id, job_blocks.worker_id, job_blocks.task_type, workers.id, workers.created_at, workers.updated_at, workers.uuid, workers.secret, workers.name, workers.address, workers.platform, workers.software, workers.status, workers.last_seen_at, workers.status_requested, workers.lazy_status_request, workers.supported_task_types, workers.deleted_at, workers.can_restart
FROM job_blocks
INNER JOIN jobs ON jobs.id = job_blocks.job_id
INNER JOIN workers on workers.id = job_blocks.worker_id
WHERE jobs.uuid = ?1
ORDER BY workers.name
`
type FetchJobBlocklistRow struct {
JobBlock JobBlock
Worker Worker
}
func (q *Queries) FetchJobBlocklist(ctx context.Context, jobuuid string) ([]FetchJobBlocklistRow, error) {
rows, err := q.db.QueryContext(ctx, fetchJobBlocklist, jobuuid)
if err != nil {
return nil, err
}
defer rows.Close()
var items []FetchJobBlocklistRow
for rows.Next() {
var i FetchJobBlocklistRow
if err := rows.Scan(
&i.JobBlock.ID,
&i.JobBlock.CreatedAt,
&i.JobBlock.JobID,
&i.JobBlock.WorkerID,
&i.JobBlock.TaskType,
&i.Worker.ID,
&i.Worker.CreatedAt,
&i.Worker.UpdatedAt,
&i.Worker.UUID,
&i.Worker.Secret,
&i.Worker.Name,
&i.Worker.Address,
&i.Worker.Platform,
&i.Worker.Software,
&i.Worker.Status,
&i.Worker.LastSeenAt,
&i.Worker.StatusRequested,
&i.Worker.LazyStatusRequest,
&i.Worker.SupportedTaskTypes,
&i.Worker.DeletedAt,
&i.Worker.CanRestart,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const fetchJobByID = `-- name: FetchJobByID :one
SELECT id, created_at, updated_at, uuid, name, job_type, priority, status, activity, settings, metadata, delete_requested_at, storage_shaman_checkout_id, worker_tag_id FROM jobs
WHERE id = ? LIMIT 1
@ -758,6 +851,25 @@ func (q *Queries) JobCountTasksInStatus(ctx context.Context, arg JobCountTasksIn
return num_tasks, err
}
const removeFromJobBlocklist = `-- name: RemoveFromJobBlocklist :exec
DELETE FROM job_blocks
WHERE
job_blocks.job_id in (SELECT jobs.id FROM jobs WHERE jobs.uuid=?1)
AND job_blocks.worker_id in (SELECT workers.id FROM workers WHERE workers.uuid=?2)
AND job_blocks.task_type = ?3
`
type RemoveFromJobBlocklistParams struct {
JobUUID string
WorkerUUID string
TaskType string
}
func (q *Queries) RemoveFromJobBlocklist(ctx context.Context, arg RemoveFromJobBlocklistParams) error {
_, err := q.db.ExecContext(ctx, removeFromJobBlocklist, arg.JobUUID, arg.WorkerUUID, arg.TaskType)
return err
}
const requestJobDeletion = `-- name: RequestJobDeletion :exec
UPDATE jobs SET
updated_at = ?1,

View File

@ -53,6 +53,16 @@ func fetchWorkerTag(gormDB *gorm.DB, uuid string) (*WorkerTag, error) {
return &w, nil
}
// fetchWorkerTagByID fetches the worker tag using the given database instance.
func fetchWorkerTagByID(gormDB *gorm.DB, id uint) (*WorkerTag, error) {
w := WorkerTag{}
tx := gormDB.First(&w, "id = ?", id)
if tx.Error != nil {
return nil, workerTagError(tx.Error, "fetching worker tag")
}
return &w, nil
}
func (db *DB) SaveWorkerTag(ctx context.Context, tag *WorkerTag) error {
if err := db.gormDB.WithContext(ctx).Save(tag).Error; err != nil {
return workerTagError(err, "saving worker tag")

View File

@ -7,6 +7,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"projects.blender.org/studio/flamenco/pkg/api"
"projects.blender.org/studio/flamenco/pkg/shaman/filestore"
"projects.blender.org/studio/flamenco/pkg/shaman/testsupport"
@ -56,7 +57,7 @@ func TestCheckout(t *testing.T) {
func assertLinksTo(t *testing.T, linkPath, expectedTarget string) {
actualTarget, err := os.Readlink(linkPath)
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, expectedTarget, actualTarget)
}

View File

@ -24,15 +24,18 @@ package checkout
import (
"context"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
"time"
"github.com/mattn/go-colorable"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"projects.blender.org/studio/flamenco/pkg/api"
"projects.blender.org/studio/flamenco/pkg/shaman/config"
"projects.blender.org/studio/flamenco/pkg/shaman/filestore"
@ -40,6 +43,9 @@ import (
)
func createTestManager() (*Manager, func()) {
output := zerolog.ConsoleWriter{Out: colorable.NewColorableStdout(), TimeFormat: time.RFC3339}
log.Logger = log.Output(output)
conf, confCleanup := config.CreateTestConfig()
fileStore := filestore.New(conf)
manager := NewManager(conf, fileStore)
@ -54,33 +60,33 @@ func TestSymlinkToCheckout(t *testing.T) {
// Fake an older file.
blobPath := filepath.Join(manager.checkoutBasePath, "jemoeder.blob")
err := ioutil.WriteFile(blobPath, []byte("op je hoofd"), 0600)
assert.NoError(t, err)
err := os.WriteFile(blobPath, []byte("op je hoofd"), 0600)
require.NoError(t, err)
wayBackWhen := time.Now().Add(-time.Hour * 24 * 100)
err = os.Chtimes(blobPath, wayBackWhen, wayBackWhen)
assert.NoError(t, err)
require.NoError(t, err)
symlinkRelativePath := "path/to/jemoeder.txt"
err = manager.SymlinkToCheckout(blobPath, manager.checkoutBasePath, symlinkRelativePath)
assert.NoError(t, err)
require.NoError(t, err)
err = manager.SymlinkToCheckout(blobPath, manager.checkoutBasePath, symlinkRelativePath)
assert.NoError(t, err, "symlinking a file twice should not be an issue")
require.NoError(t, err, "symlinking a file twice should not be an issue")
// Wait for touch() calls to be done.
manager.wg.Wait()
// The blob should have been touched to indicate it was referenced just now.
stat, err := os.Stat(blobPath)
assert.NoError(t, err)
require.NoError(t, err)
assert.True(t,
stat.ModTime().After(wayBackWhen),
"File must be touched (%v must be later than %v)", stat.ModTime(), wayBackWhen)
symlinkPath := filepath.Join(manager.checkoutBasePath, symlinkRelativePath)
stat, err = os.Lstat(symlinkPath)
assert.NoError(t, err)
require.NoError(t, err)
assert.True(t, stat.Mode()&os.ModeType == os.ModeSymlink,
"%v should be a symlink", symlinkPath)
}

View File

@ -27,6 +27,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"projects.blender.org/studio/flamenco/pkg/api"
)
@ -44,7 +45,7 @@ func TestReportRequirements(t *testing.T) {
}
response, err := manager.ReportRequirements(context.Background(), required)
assert.NoError(t, err)
require.NoError(t, err)
// We should not be required to upload the same file twice, so the duplicate
// should not be in the response.

View File

@ -23,7 +23,6 @@
package shaman
import (
"errors"
"fmt"
"io/fs"
"os"
@ -31,8 +30,12 @@ import (
"testing"
"time"
"github.com/mattn/go-colorable"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"projects.blender.org/studio/flamenco/pkg/shaman/config"
"projects.blender.org/studio/flamenco/pkg/shaman/filestore"
"projects.blender.org/studio/flamenco/pkg/shaman/jwtauth"
@ -40,6 +43,9 @@ import (
)
func createTestShaman() (*Server, func()) {
output := zerolog.ConsoleWriter{Out: colorable.NewColorableStdout(), TimeFormat: time.RFC3339}
log.Logger = log.Output(output)
conf, confCleanup := config.CreateTestConfig()
shaman := NewServer(conf, jwtauth.AlwaysDeny{})
return shaman, confCleanup
@ -101,7 +107,7 @@ func TestGCFindOldFiles(t *testing.T) {
// Since all the links have just been created, nothing should be considered old.
ageThreshold := server.gcAgeThreshold()
old, err := server.gcFindOldFiles(ageThreshold, log.With().Str("test", "test").Logger())
assert.NoError(t, err)
require.NoError(t, err)
assert.EqualValues(t, mtimeMap{}, old)
// Make some files old, they should show up in a scan.
@ -111,7 +117,7 @@ func TestGCFindOldFiles(t *testing.T) {
makeOld(server, expectOld, "stored/dc/89f15de821ad1df3e78f8ef455e653a2d1862f2eb3f5ee78aa4ca68eb6fb35/781.blob")
old, err = server.gcFindOldFiles(ageThreshold, log.With().Str("package", "shaman/test").Logger())
assert.NoError(t, err)
require.NoError(t, err)
assert.EqualValues(t, expectOld, old)
}
@ -151,18 +157,18 @@ func TestGCComponents(t *testing.T) {
// No symlinks created yet, so this should report all the files in oldFiles.
oldFiles := copymap(expectOld)
err := server.gcFilterLinkedFiles(server.config.CheckoutPath(), oldFiles, log.With().Str("package", "shaman/test").Logger(), nil)
assert.NoError(t, err)
require.NoError(t, err)
assert.EqualValues(t, expectOld, oldFiles)
// Create some symlinks
checkoutInfo, err := server.checkoutMan.PrepareCheckout("checkoutID")
assert.NoError(t, err)
require.NoError(t, err)
err = server.checkoutMan.SymlinkToCheckout(absPaths["3367.blob"], server.config.CheckoutPath(),
filepath.Join(checkoutInfo.RelativePath, "use-of-3367.blob"))
assert.NoError(t, err)
require.NoError(t, err)
err = server.checkoutMan.SymlinkToCheckout(absPaths["781.blob"], extraCheckoutDir,
filepath.Join(checkoutInfo.RelativePath, "use-of-781.blob"))
assert.NoError(t, err)
require.NoError(t, err)
// There should only be two old file reported now.
expectRemovable := mtimeMap{
@ -173,17 +179,17 @@ func TestGCComponents(t *testing.T) {
stats := GCStats{}
err = server.gcFilterLinkedFiles(server.config.CheckoutPath(), oldFiles, log.With().Str("package", "shaman/test").Logger(), &stats)
assert.Equal(t, 1, stats.numSymlinksChecked) // 1 is in checkoutPath, the other in extraCheckoutDir
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, len(expectRemovable)+1, len(oldFiles)) // one file is linked from the extra checkout dir
err = server.gcFilterLinkedFiles(extraCheckoutDir, oldFiles, log.With().Str("package", "shaman/test").Logger(), &stats)
assert.Equal(t, 2, stats.numSymlinksChecked) // 1 is in checkoutPath, the other in extraCheckoutDir
assert.NoError(t, err)
require.NoError(t, err)
assert.EqualValues(t, expectRemovable, oldFiles)
// Touching a file before requesting deletion should not delete it.
now := time.Now()
err = os.Chtimes(absPaths["6001.blob"], now, now)
assert.NoError(t, err)
require.NoError(t, err)
// Running the garbage collector should only remove that one unused and untouched file.
assert.FileExists(t, absPaths["6001.blob"], "file should exist before GC")
@ -198,7 +204,7 @@ func TestGCComponents(t *testing.T) {
assert.FileExists(t, absPaths["6001.blob"], "file should exist after GC")
assert.FileExists(t, absPaths["781.blob"], "file should exist after GC")
_, err = os.Stat(absPaths["7488.blob"])
assert.True(t, errors.Is(err, fs.ErrNotExist), "file %s should NOT exist after GC", absPaths["7488.blob"])
assert.ErrorIs(t, err, fs.ErrNotExist, "file %s should NOT exist after GC", absPaths["7488.blob"])
}
// Test of the high-level GCStorage() function.
@ -228,13 +234,13 @@ func TestGarbageCollect(t *testing.T) {
// Create some symlinks
checkoutInfo, err := server.checkoutMan.PrepareCheckout("checkoutID")
assert.NoError(t, err)
require.NoError(t, err)
err = server.checkoutMan.SymlinkToCheckout(absPaths["3367.blob"], server.config.CheckoutPath(),
filepath.Join(checkoutInfo.RelativePath, "use-of-3367.blob"))
assert.NoError(t, err)
require.NoError(t, err)
err = server.checkoutMan.SymlinkToCheckout(absPaths["781.blob"], extraCheckoutDir,
filepath.Join(checkoutInfo.RelativePath, "use-of-781.blob"))
assert.NoError(t, err)
require.NoError(t, err)
// Running the garbage collector should only remove those two unused files.
assert.FileExists(t, absPaths["6001.blob"], "file should exist before GC")
@ -244,9 +250,9 @@ func TestGarbageCollect(t *testing.T) {
assert.FileExists(t, absPaths["7488.blob"], "file should exist after dry-run GC")
server.GCStorage(false)
_, err = os.Stat(absPaths["6001.blob"])
assert.True(t, errors.Is(err, fs.ErrNotExist), "file %s should NOT exist after GC", absPaths["6001.blob"])
assert.ErrorIs(t, err, fs.ErrNotExist, "file %s should NOT exist after GC", absPaths["6001.blob"])
_, err = os.Stat(absPaths["7488.blob"])
assert.True(t, errors.Is(err, fs.ErrNotExist), "file %s should NOT exist after GC", absPaths["7488.blob"])
assert.ErrorIs(t, err, fs.ErrNotExist, "file %s should NOT exist after GC", absPaths["7488.blob"])
// Used files should still exist.
assert.FileExists(t, absPaths["781.blob"])

View File

@ -23,7 +23,6 @@
package config
import (
"io/ioutil"
"os"
"path/filepath"
"time"
@ -31,7 +30,7 @@ import (
// CreateTestConfig creates a configuration + cleanup function.
func CreateTestConfig() (conf Config, cleanup func()) {
tempDir, err := ioutil.TempDir("", "shaman-test-")
tempDir, err := os.MkdirTemp("", "shaman-test-")
if err != nil {
panic(err)
}

View File

@ -33,6 +33,7 @@ import (
"projects.blender.org/studio/flamenco/pkg/shaman/hasher"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"projects.blender.org/studio/flamenco/pkg/shaman/filestore"
)
@ -79,14 +80,14 @@ func TestStoreFile(t *testing.T) {
// The correct checksum should be accepted.
err = testWithChecksum(correctChecksum, filesize)
assert.NoError(t, err)
require.NoError(t, err)
path, status = server.fileStore.ResolveFile(correctChecksum, filesize, filestore.ResolveEverything)
assert.Equal(t, filestore.StatusStored, status)
assert.FileExists(t, path)
savedContent, err := ioutil.ReadFile(path)
assert.NoError(t, err)
require.NoError(t, err)
assert.EqualValues(t, payload, savedContent, "The file should be saved uncompressed")
}

View File

@ -29,6 +29,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// mustCreateFile creates an empty file.
@ -105,17 +106,17 @@ func TestOpenForUpload(t *testing.T) {
fileSize := int64(len(contents))
file, err := store.OpenForUpload("abcdefxxx", fileSize)
assert.NoError(t, err)
require.NoError(t, err)
_, err = file.Write(contents)
assert.NoError(t, err)
assert.NoError(t, file.Close())
require.NoError(t, err)
require.NoError(t, file.Close())
foundPath, status := store.ResolveFile("abcdefxxx", fileSize, ResolveEverything)
assert.Equal(t, file.Name(), foundPath)
assert.Equal(t, StatusUploading, status)
readContents, err := ioutil.ReadFile(foundPath)
assert.NoError(t, err)
require.NoError(t, err)
assert.EqualValues(t, contents, readContents)
}
@ -130,14 +131,14 @@ func TestMoveToStored(t *testing.T) {
assert.Error(t, err)
file, err := store.OpenForUpload("abcdefxxx", fileSize)
assert.NoError(t, err)
require.NoError(t, err)
_, err = file.Write(contents)
assert.NoError(t, err)
assert.NoError(t, file.Close())
require.NoError(t, err)
require.NoError(t, file.Close())
tempLocation := file.Name()
err = store.MoveToStored("abcdefxxx", fileSize, file.Name())
assert.NoError(t, err, "moving file %s", file.Name())
require.NoError(t, err, "moving file %s", file.Name())
foundPath, status := store.ResolveFile("abcdefxxx", fileSize, ResolveEverything)
assert.NotEqual(t, file.Name(), foundPath)

View File

@ -29,6 +29,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestStoragePrefix(t *testing.T) {
@ -61,7 +62,7 @@ func TestFilePermissions(t *testing.T) {
t.SkipNow()
}
dirname, err := os.MkdirTemp("", "file-permission-test")
assert.NoError(t, err)
require.NoError(t, err)
defer os.RemoveAll(dirname)
bin := storageBin{
@ -71,11 +72,11 @@ func TestFilePermissions(t *testing.T) {
}
file, err := bin.openForWriting("testfilename.blend")
assert.NoError(t, err)
require.NoError(t, err)
defer file.Close()
filestat, err := file.Stat()
assert.NoError(t, err)
require.NoError(t, err)
// The exact permissions depend on the current (unittest) process umask. This
// umask is not easy to get, which is why we have a copy of `tempfile.go` in

View File

@ -29,6 +29,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestTouch(t *testing.T) {
@ -46,7 +47,7 @@ func TestTouch(t *testing.T) {
assert.Nil(t, Touch(testPath))
stat, err := os.Stat(testPath)
assert.NoError(t, err)
require.NoError(t, err)
threshold := time.Now().Add(-5 * time.Second)
assert.True(t, stat.ModTime().After(threshold),

View File

@ -34,16 +34,6 @@
<span @click="copyElementText" class="click-to-copy">{{ jobData.id }}</span>
</dd>
<template v-if="workerTag">
<!-- TODO: fetch tag name and show that instead, and allow editing of the tag. -->
<dt class="field-name" title="Worker Tag">Tag</dt>
<dd :title="workerTag.description">
<span @click="copyElementData" class="click-to-copy" :data-clipboard="workerTag.id">{{
workerTag.name
}}</span>
</dd>
</template>
<dt class="field-name" title="Name">Name</dt>
<dd>{{ jobData.name }}</dd>
@ -52,9 +42,15 @@
{{ jobData.status }}
</dd>
<dt class="field-type" title="Type">Type</dt>
<dt class="field-type" title="Job Type">Job Type</dt>
<dd>{{ jobType ? jobType.label : jobData.type }}</dd>
<dt class="field-worker-tag" title="Worker Tag">Worker Tag</dt>
<dd v-if="workerTag" :title="workerTag.description">
{{ workerTag.name }}
</dd>
<dd v-else class="no-worker-tag">All Workers</dd>
<dt class="field-priority" title="Priority">Priority</dt>
<dd>
<PopoverEditableJobPriority :jobId="jobData.id" :priority="jobData.priority" />
@ -289,4 +285,8 @@ export default {
color: var(--indicator-color);
font-weight: bold;
}
dd.no-worker-tag {
color: var(--color-text-muted);
}
</style>

View File

@ -38,7 +38,7 @@ software. Development is supported by the Blender project.
### Simple and Portable
Flamenco consists of a few components and requires almost no configuration
Flamenco consists of a few components and requires almost no configuration in order
to be used in production.
{{< /columns >}}
@ -48,13 +48,13 @@ to be used in production.
### Easy Customization
Designed to be customizable, Flamenco allows TDs to specify
Job Types using the JavaScript language and seamlessly fit into the pipeline.
Job Types are specified using the JavaScript language and seamlessly fit into a pipeline.
<--->
### Cross-platform and Self-hosted
Flamenco runs on all major operating system, and is fully hosted on your own hardware.
Flamenco runs on all major operating systems, and is self hostable.
Your data is yours, and yours alone.
{{< /columns >}}
@ -63,7 +63,7 @@ Your data is yours, and yours alone.
### Robust Technology
The core of Flamenco is build using Go and SQLite. Check out the sources on
The core of Flamenco is built using Go and SQLite. Check out the source code on
[projects.blender.org](https://projects.blender.org/studio/flamenco).
<--->