Manager: allow setup to finish without Blender #104306

Manually merged
Sybren A. Stüvel merged 34 commits from abelli/flamenco:issue100195 into main 2024-09-09 11:22:42 +02:00
41 changed files with 1042 additions and 158 deletions
Showing only changes of commit 93d9d35ead - Show all commits

View File

@ -6,11 +6,14 @@ bugs in actually-released versions.
## 3.6 - in development
- Change the name of the add-on from "Flamenco 3" to just "Flamenco".
- Add `label` to job settings, to have full control over how they are presented in Blender's job submission GUI. If a job setting does not define a label, its `key` is used to generate one (like Flamenco 3.5 and older).
- Add `shellSplit(someString)` function to the job compiler scripts. This splits a string into an array of strings using shell/CLI semantics.
- Make it possible to script job submissions in Blender, by executing the `bpy.ops.flamenco.submit_job(job_name="jobname")` operator.
- Security updates of some deendencies:
- [GO-2024-2937: Parsing a corrupt or malicious image with invalid color indices can cause a panic](https://pkg.go.dev/vuln/GO-2024-2937)
- Web interface: list the job's worker tag in the job details.
- Ensure the submitted scene is rendered in a multi-scene blend file.
## 3.5 - released 2024-04-16

View File

@ -1,6 +1,6 @@
# Flamenco 3
# Flamenco
This repository contains the sources for Flamenco 3. The Manager, Worker, and
This repository contains the sources for Flamenco. The Manager, Worker, and
Blender add-on sources are all combined in this one repository.
The documentation is available on https://flamenco.blender.org/, including

View File

@ -1,4 +1,4 @@
# Flamenco 3 Blender add-on
# Flamenco Blender add-on
## Setting up development environment

View File

@ -3,7 +3,7 @@
# <pep8 compliant>
bl_info = {
"name": "Flamenco 3",
"name": "Flamenco",
"author": "Sybren A. Stüvel",
"version": (3, 6),
"blender": (3, 1, 0),
@ -156,6 +156,12 @@ def register() -> None:
max=100,
)
bpy.types.Scene.flamenco_job_submit_as_paused = bpy.props.BoolProperty(
name="Flamenco Job Submit as Paused",
description="Whether the job is paused initially; Checked sets the job to `paused`, and Unchecked sets the job to `queued`",
default=False,
)
preferences.register()
worker_tags.register()
operators.register()

View File

@ -32,6 +32,7 @@ MAX_FAILED_PATHS = 8
HashableShamanFileSpec = tuple[str, int, str]
"""Tuple of the 'sha', 'size', and 'path' fields of a ShamanFileSpec."""
# Mypy doesn't understand that submodules.pack.Packer exists.
class Packer(submodules.pack.Packer): # type: ignore
"""Creates BAT Packs on a Shaman server."""

View File

@ -22,7 +22,7 @@ class FLAMENCO_PT_job_submission(bpy.types.Panel):
bl_space_type = "PROPERTIES"
bl_region_type = "WINDOW"
bl_context = "output"
bl_label = "Flamenco 3"
bl_label = "Flamenco"
# A temporary job can be constructed so that dynamic, read-only properties can be evaluated.
# This is only scoped to a single draw() call.
@ -42,6 +42,9 @@ class FLAMENCO_PT_job_submission(bpy.types.Panel):
col = layout.column(align=True)
col.prop(context.scene, "flamenco_job_name", text="Job Name")
col.prop(context.scene, "flamenco_job_priority", text="Priority")
col.prop(
context.scene, "flamenco_job_submit_as_paused", text="Submit as Paused"
)
# Refreshables:
col = layout.column(align=True)
@ -50,7 +53,7 @@ class FLAMENCO_PT_job_submission(bpy.types.Panel):
)
if not job_types.are_job_types_available():
return
col.prop(context.scene, "flamenco_worker_tag", text="Tag")
col.prop(context.scene, "flamenco_worker_tag", text="Worker Tag")
# Job properties:
job_col = layout.column(align=True)

View File

@ -33,6 +33,7 @@ log = logging.getLogger(__name__)
def job_for_scene(scene: bpy.types.Scene) -> Optional[_SubmittedJob]:
from flamenco.manager.models import SubmittedJob, JobMetadata
from flamenco.manager.model.job_status import JobStatus
propgroup = getattr(scene, "flamenco_job_settings", None)
assert isinstance(propgroup, JobTypePropertyGroup), "did not expect %s" % (
@ -44,6 +45,12 @@ def job_for_scene(scene: bpy.types.Scene) -> Optional[_SubmittedJob]:
priority = getattr(scene, "flamenco_job_priority", 50)
submit_as_paused = getattr(scene, "flamenco_job_submit_as_paused", False)
if submit_as_paused:
initial_status = JobStatus("paused")
else:
initial_status = JobStatus("queued")
job: SubmittedJob = SubmittedJob(
name=scene.flamenco_job_name,
type=propgroup.job_type.name,
@ -52,6 +59,7 @@ def job_for_scene(scene: bpy.types.Scene) -> Optional[_SubmittedJob]:
metadata=metadata,
submitter_platform=platform.system().lower(),
type_etag=propgroup.job_type.etag,
initial_status=initial_status,
)
worker_tag: str = getattr(scene, "flamenco_worker_tag", "")

View File

@ -126,6 +126,7 @@ class FLAMENCO_OT_submit_job(FlamencoOpMixin, bpy.types.Operator):
def poll(cls, context: bpy.types.Context) -> bool:
# Only allow submission when there is a job type selected.
job_type = job_types.active_job_type(context.scene)
cls.poll_message_set("No job type selected")
return job_type is not None
def execute(self, context: bpy.types.Context) -> set[str]:
@ -335,7 +336,14 @@ class FLAMENCO_OT_submit_job(FlamencoOpMixin, bpy.types.Operator):
)
prefs.experimental.use_all_linked_data_direct = True
filepath = Path(context.blend_data.filepath).with_suffix(".flamenco.blend")
filepath = Path(context.blend_data.filepath)
if job_submission.is_file_inside_job_storage(context, filepath):
self.log.info(
"Saving blendfile, already in shared storage: %s", filepath
)
bpy.ops.wm.save_as_mainfile()
else:
filepath = filepath.with_suffix(".flamenco.blend")
self.log.info("Saving copy to temporary file %s", filepath)
bpy.ops.wm.save_as_mainfile(
filepath=str(filepath), compress=True, copy=True

View File

@ -148,7 +148,13 @@ func runFlamencoManager() bool {
log.Fatal().Err(err).Msg("unable to figure out my own URL")
}
ssdp := makeAutoDiscoverable(urls)
// Construct the UPnP/SSDP server.
var ssdp *upnp_ssdp.Server
if configService.Get().SSDPDiscovery {
ssdp = makeAutoDiscoverable(urls)
} else {
log.Debug().Msg("UPnP/SSDP autodiscovery disabled in configuration")
}
// Construct the services.
persist := openDB(*configService)

View File

@ -89,6 +89,56 @@ func TestQueryJobs(t *testing.T) {
assertResponseJSON(t, echoCtx, http.StatusOK, expectedJobs)
}
func TestFetchJob(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mf := newMockedFlamenco(mockCtrl)
dbJob := persistence.Job{
UUID: "afc47568-bd9d-4368-8016-e91d945db36d",
Name: "работа",
JobType: "test",
Priority: 50,
Status: api.JobStatusActive,
Settings: persistence.StringInterfaceMap{
"result": "/render/frames/exploding.kittens",
},
Metadata: persistence.StringStringMap{
"project": "/projects/exploding-kittens",
},
WorkerTag: &persistence.WorkerTag{
UUID: "d86e1b84-5ee2-4784-a178-65963eeb484b",
Name: "Tikkie terug Kees!",
Description: "",
},
}
echoCtx := mf.prepareMockedRequest(nil)
mf.persistence.EXPECT().FetchJob(gomock.Any(), dbJob.UUID).Return(&dbJob, nil)
require.NoError(t, mf.flamenco.FetchJob(echoCtx, dbJob.UUID))
expectedJob := api.Job{
SubmittedJob: api.SubmittedJob{
Name: "работа",
Type: "test",
Priority: 50,
Settings: &api.JobSettings{AdditionalProperties: map[string]interface{}{
"result": "/render/frames/exploding.kittens",
}},
Metadata: &api.JobMetadata{AdditionalProperties: map[string]string{
"project": "/projects/exploding-kittens",
}},
WorkerTag: ptr("d86e1b84-5ee2-4784-a178-65963eeb484b"),
},
Id: "afc47568-bd9d-4368-8016-e91d945db36d",
Status: api.JobStatusActive,
}
assertResponseJSON(t, echoCtx, http.StatusOK, expectedJob)
}
func TestFetchTask(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

View File

@ -243,6 +243,85 @@ func TestReplaceTwoWayVariables(t *testing.T) {
}
}
// TestReplaceTwoWayVariablesFFmpegExpression tests that slashes (for division)
// in an FFmpeg filter expression are NOT replaced with backslashes when sending
// to a Windows worker.
func TestReplaceTwoWayVariablesFFmpegExpression(t *testing.T) {
c := config.DefaultConfig(func(c *config.Conf) {
// Mock that the Manager is running Linux.
c.MockCurrentGOOSForTests("linux")
// Trigger a translation of a path in the FFmpeg command arguments.
c.Variables["project"] = config.Variable{
IsTwoWay: true,
Values: []config.VariableValue{
{Value: "/projects/charge", Platform: config.VariablePlatformLinux, Audience: config.VariableAudienceAll},
{Value: `P:\charge`, Platform: config.VariablePlatformWindows, Audience: config.VariableAudienceAll},
},
}
})
task := api.AssignedTask{
Job: "f0bde4d0-eaaf-4ee0-976b-802a86aa2d02",
JobPriority: 50,
JobType: "simple-blender-render",
Name: "preview-video",
Priority: 50,
Status: api.TaskStatusQueued,
TaskType: "ffmpeg",
Uuid: "fd963a82-2e98-4a39-9bd4-c302e5b8814f",
Commands: []api.Command{
{
Name: "frames-to-video",
Parameters: map[string]interface{}{
"exe": "ffmpeg", // Should not change.
"fps": 24, // Should not change type.
"inputGlob": "/projects/charge/renders/*.webp", // Path, should change.
"outputFile": "/projects/charge/renders/video.mp4", // Path, should change.
"args": []string{
"-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2", // Should not change.
"-fake-lut", `/projects/charge/ffmpeg.lut`, // Path, should change.
},
},
},
},
}
worker := persistence.Worker{Platform: "windows"}
replacedTask := replaceTaskVariables(&c, task, worker)
expectTask := api.AssignedTask{
Job: "f0bde4d0-eaaf-4ee0-976b-802a86aa2d02",
JobPriority: 50,
JobType: "simple-blender-render",
Name: "preview-video",
Priority: 50,
Status: api.TaskStatusQueued,
TaskType: "ffmpeg",
Uuid: "fd963a82-2e98-4a39-9bd4-c302e5b8814f",
Commands: []api.Command{
{
Name: "frames-to-video",
Parameters: map[string]interface{}{
"exe": "ffmpeg",
"fps": 24,
// These two parameters matched a two-way variable:
"inputGlob": `P:\charge\renders\*.webp`,
"outputFile": `P:\charge\renders\video.mp4`,
"args": []string{
// This parameter should not change:
"-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2",
// This parameter should change:
"-fake-lut", `P:\charge\ffmpeg.lut`,
},
},
},
},
}
assert.Equal(t, expectTask, replacedTask)
}
func varReplSubmittedJob() api.SubmittedJob {
return api.SubmittedJob{
Type: "simple-blender-render",
@ -273,7 +352,7 @@ func varReplSubmittedJob() api.SubmittedJob {
}
// jsonWash converts the given value to JSON and back.
// This makes sure the types are as closed to what the API will handle as
// This makes sure the types are as close to what the API will handle as
// possible, making the difference between "array of strings" and "array of
// interface{}s that happen to be strings".
func jsonWash[T any](value T) T {

View File

@ -313,8 +313,11 @@ func (f *Flamenco) FetchWorkerTag(e echo.Context, tagUUID string) error {
logger.Error().Err(err).Msg("fetching worker tag")
return sendAPIError(e, http.StatusInternalServerError, "error fetching worker tag: %v", err)
}
if tag == nil {
panic("Could fetch a worker tag without error, but then the returned tag was still nil")
}
return e.JSON(http.StatusOK, workerTagDBtoAPI(*tag))
return e.JSON(http.StatusOK, workerTagDBtoAPI(tag))
}
func (f *Flamenco) UpdateWorkerTag(e echo.Context, tagUUID string) error {
@ -387,8 +390,8 @@ func (f *Flamenco) FetchWorkerTags(e echo.Context) error {
apiTags := []api.WorkerTag{}
for _, dbTag := range dbTags {
apiTag := workerTagDBtoAPI(*dbTag)
apiTags = append(apiTags, apiTag)
apiTag := workerTagDBtoAPI(dbTag)
apiTags = append(apiTags, *apiTag)
}
tagList := api.WorkerTagList{
@ -443,7 +446,7 @@ func (f *Flamenco) CreateWorkerTag(e echo.Context) error {
sioUpdate := eventbus.NewWorkerTagUpdate(&dbTag)
f.broadcaster.BroadcastNewWorkerTag(sioUpdate)
return e.JSON(http.StatusOK, workerTagDBtoAPI(dbTag))
return e.JSON(http.StatusOK, workerTagDBtoAPI(&dbTag))
}
func workerSummary(w persistence.Worker) api.WorkerSummary {
@ -479,7 +482,7 @@ func workerDBtoAPI(w persistence.Worker) api.Worker {
if len(w.Tags) > 0 {
tags := []api.WorkerTag{}
for i := range w.Tags {
tags = append(tags, workerTagDBtoAPI(*w.Tags[i]))
tags = append(tags, *workerTagDBtoAPI(w.Tags[i]))
}
apiWorker.Tags = &tags
}
@ -487,7 +490,11 @@ func workerDBtoAPI(w persistence.Worker) api.Worker {
return apiWorker
}
func workerTagDBtoAPI(wc persistence.WorkerTag) api.WorkerTag {
func workerTagDBtoAPI(wc *persistence.WorkerTag) *api.WorkerTag {
if wc == nil {
return nil
}
uuid := wc.UUID // Take a copy for safety.
apiTag := api.WorkerTag{
@ -497,5 +504,5 @@ func workerTagDBtoAPI(wc persistence.WorkerTag) api.WorkerTag {
if len(wc.Description) > 0 {
apiTag.Description = &wc.Description
}
return apiTag
return &apiTag
}

View File

@ -106,6 +106,10 @@ func (ve *VariableExpander) Expand(valueToExpand string) string {
isPathValue := false
for varname, varvalue := range ve.targetTwoWayVars {
placeholder := fmt.Sprintf("{%s}", varname)
if !strings.Contains(expanded, placeholder) {
continue
}
expanded = strings.Replace(expanded, placeholder, varvalue, -1)
// Since two-way variables are meant for path replacement, we know this

View File

@ -139,6 +139,41 @@ func TestSimpleBlenderRenderHappy(t *testing.T) {
assert.Equal(t, expectDeps, tVideo.Dependencies)
}
func TestSimpleBlenderRenderWithScene(t *testing.T) {
c := mockedClock(t)
s, err := Load(c)
require.NoError(t, err)
// Compiling a job should be really fast.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
sj := exampleSubmittedJob()
sj.Settings.AdditionalProperties["scene"] = "Test Scene"
aj, err := s.Compile(ctx, sj)
require.NoError(t, err)
require.NotNil(t, aj)
t0 := aj.Tasks[0]
expectCliArgs := []interface{}{ // They are strings, but Goja doesn't know that and will produce an []interface{}.
"--scene", "Test Scene",
"--render-output", "/render/sprites/farm_output/promo/square_ellie/square_ellie.lighting_light_breakdown2/######",
"--render-format", "PNG",
"--render-frame", "1..3",
}
assert.Equal(t, "render-1-3", t0.Name)
assert.Equal(t, 1, len(t0.Commands))
assert.Equal(t, "blender-render", t0.Commands[0].Name)
assert.EqualValues(t, AuthoredCommandParameters{
"exe": "{blender}",
"exeArgs": "{blenderArgs}",
"blendfile": "/render/sf/jobs/scene123.blend",
"args": expectCliArgs,
"argsBefore": make([]interface{}, 0),
}, t0.Commands[0].Parameters)
}
func TestJobWithoutTag(t *testing.T) {
c := mockedClock(t)

View File

@ -32,6 +32,8 @@ const JOB_TYPE = {
description: "File extension used when rendering images" },
{ key: "has_previews", type: "bool", required: false, eval: "C.scene.render.image_settings.use_preview", visible: "hidden",
description: "Whether Blender will render preview images."},
{ key: "scene", type: "string", required: true, eval: "C.scene.name", visible: "web",
description: "Name of the scene to render."},
]
};
@ -100,6 +102,12 @@ function authorRenderTasks(settings, renderDir, renderOutput) {
print("authorRenderTasks(", renderDir, renderOutput, ")");
let renderTasks = [];
let chunks = frameChunker(settings.frames, settings.chunk_size);
let baseArgs = [];
if (settings.scene) {
baseArgs = baseArgs.concat(["--scene", settings.scene]);
}
for (let chunk of chunks) {
const task = author.Task(`render-${chunk}`, "blender");
const command = author.Command("blender-render", {
@ -107,11 +115,11 @@ function authorRenderTasks(settings, renderDir, renderOutput) {
exeArgs: "{blenderArgs}",
argsBefore: [],
blendfile: settings.blendfile,
args: [
args: baseArgs.concat([
"--render-output", path.join(renderDir, path.basename(renderOutput)),
"--render-format", settings.format,
"--render-frame", chunk.replaceAll("-", ".."), // Convert to Blender frame range notation.
]
])
});
task.addCommand(command);
renderTasks.push(task);

View File

@ -0,0 +1,334 @@
const JOB_TYPE = {
label: "Single Image Render",
description: "Distributed rendering of a single image.",
settings: [
// Settings for artists to determine:
{
key: "tile_size_x",
type: "int32",
default: 64,
description: "Tile size in pixels for the X axis"
},
{
key: "tile_size_y",
type: "int32",
default: 64,
description: "Tile size in pixels for the Y axis"
},
{
key: "frame", type: "int32", required: true,
eval: "C.scene.frame_current",
description: "Frame to render. Examples: '47', '1'"
},
// render_output_root + add_path_components determine the value of render_output_path.
{
key: "render_output_root",
type: "string",
subtype: "dir_path",
required: true,
visible: "submission",
description: "Base directory of where render output is stored. Will have some job-specific parts appended to it"
},
{
key: "add_path_components",
type: "int32",
required: true,
default: 0,
propargs: {min: 0, max: 32},
visible: "submission",
description: "Number of path components of the current blend file to use in the render output path"
},
{
key: "render_output_path", type: "string", subtype: "file_path", editable: false,
eval: "str(Path(abspath(settings.render_output_root), last_n_dir_parts(settings.add_path_components), jobname, '{timestamp}', 'tiles'))",
description: "Final file path of where render output will be saved"
},
// Automatically evaluated settings:
{
key: "blendfile",
type: "string",
required: true,
description: "Path of the Blend file to render",
visible: "web"
},
{
key: "format",
type: "string",
required: true,
eval: "C.scene.render.image_settings.file_format",
visible: "web"
},
{
key: "image_file_extension",
type: "string",
required: true,
eval: "C.scene.render.file_extension",
visible: "hidden",
description: "File extension used when rendering images"
},
{
key: "resolution_x",
type: "int32",
required: true,
eval: "C.scene.render.resolution_x",
visible: "hidden",
description: "Resolution X"
},
{
key: "resolution_y",
type: "int32",
required: true,
eval: "C.scene.render.resolution_y",
visible: "hidden",
description: "Resolution Y"
},
{
key: "resolution_scale",
type: "int32",
required: true,
eval: "C.scene.render.resolution_percentage",
visible: "hidden",
description: "Resolution scale"
}
]
};
function compileJob(job) {
print("Single Image Render job submitted");
print("job: ", job);
const settings = job.settings;
const renderOutput = renderOutputPath(job);
if (settings.resolution_scale !== 100) {
throw "Flamenco currently does not support rendering with a resolution scale other than 100%";
}
// Make sure that when the job is investigated later, it shows the
// actually-used render output:
settings.render_output_path = renderOutput;
const renderDir = path.dirname(renderOutput);
const renderTasks = authorRenderTasks(settings, renderDir, renderOutput);
const mergeTask = authorMergeTask(settings, renderDir);
for (const rt of renderTasks) {
job.addTask(rt);
}
if (mergeTask) {
// If there is a merge task, all other tasks have to be done first.
for (const rt of renderTasks) {
mergeTask.addDependency(rt);
}
job.addTask(mergeTask);
}
}
// Do field replacement on the render output path.
function renderOutputPath(job) {
let path = job.settings.render_output_path;
if (!path) {
throw "no render_output_path setting!";
}
return path.replace(/{([^}]+)}/g, (match, group0) => {
switch (group0) {
case "timestamp":
return formatTimestampLocal(job.created);
default:
return match;
}
});
}
// Calculate the borders for the tiles
// Does not take into account the overscan
function calcBorders(tileSizeX, tileSizeY, width, height) {
let borders = [];
for (let y = 0; y < height; y += tileSizeY) {
for (let x = 0; x < width; x += tileSizeX) {
borders.push([x, y, Math.min(x + tileSizeX, width), Math.min(y + tileSizeY, height)]);
}
}
print("borders: ", borders);
return borders;
}
function authorRenderTasks(settings, renderDir, renderOutput) {
print("authorRenderTasks(", renderDir, renderOutput, ")");
let renderTasks = [];
let borders = calcBorders(settings.tile_size_x, settings.tile_size_y, settings.resolution_x, settings.resolution_y);
for (let border of borders) {
const task = author.Task(`render-${border[0]}-${border[1]}`, "blender");
// Overscan is calculated in this manner to avoid rendering outside the image resolution
let pythonExpr = `import bpy
scene = bpy.context.scene
render = scene.render
render.image_settings.file_format = 'OPEN_EXR_MULTILAYER'
render.use_compositing = False
render.use_stamp = False
overscan = 16
render.border_min_x = max(${border[0]} - overscan, 0) / ${settings.resolution_x}
render.border_min_y = max(${border[1]} - overscan, 0) / ${settings.resolution_y}
render.border_max_x = min(${border[2]} + overscan, ${settings.resolution_x}) / ${settings.resolution_x}
render.border_max_y = min(${border[3]} + overscan, ${settings.resolution_x}) / ${settings.resolution_y}
render.use_border = True
render.use_crop_to_border = True
bpy.ops.render.render(write_still=True)`
const command = author.Command("blender-render", {
exe: "{blender}",
exeArgs: "{blenderArgs}",
argsBefore: [],
blendfile: settings.blendfile,
args: [
"--render-output", path.join(renderDir, path.basename(renderOutput), border[0] + "-" + border[1] + "-" + border[2] + "-" + border[3]),
"--render-format", settings.format,
"--python-expr", pythonExpr
]
});
task.addCommand(command);
renderTasks.push(task);
}
return renderTasks;
}
function authorMergeTask(settings, renderDir, renderOutput) {
print("authorMergeTask(", renderDir, ")");
const task = author.Task("merge", "blender");
// Burning metadata into the image is done by the compositor for the entire merged image
// The overall logic of the merge is as follows:
// 1. Find out the Render Layers node and to which socket it is connected
// 2. Load image files from the tiles directory.
// Their correct position is determined by their filename.
// 3. Create a node tree that scales, translates and adds the tiles together.
// A simple version of the node tree is linked here:
// https://devtalk.blender.org/uploads/default/original/3X/f/0/f047f221c70955b32e4b455e53453c5df716079e.jpeg
// 4. The final image is then fed into the socket the Render Layers node was connected to.
// This allows the compositing to work as if the image was rendered in one go.
let pythonExpr = `import bpy
render = bpy.context.scene.render
render.resolution_x = ${settings.resolution_x}
render.resolution_y = ${settings.resolution_y}
bpy.context.scene.use_nodes = True
render.use_compositing = True
render.use_stamp = True
node_tree = bpy.context.scene.node_tree
overscan = 16
render_layers_node = None
for node in node_tree.nodes:
if node.type == 'R_LAYERS':
feed_in_input = node.outputs[0]
render_layers_node = node
break
for link in node_tree.links:
if feed_in_input is not None and link.from_socket == feed_in_input:
feed_in_output = link.to_socket
break
from pathlib import Path
root = Path("${path.join(renderDir, path.basename(renderOutput))}/tiles")
image_files = [f for f in root.iterdir() if f.is_file()]
separate_nodes = []
first_crop_node = None
translate_nodes = []
min_width = min([int(f.stem.split('-')[2]) - int(f.stem.split('-')[0]) for f in image_files])
min_height = min([int(f.stem.split('-')[3]) - int(f.stem.split('-')[1]) for f in image_files])
for i, image_file in enumerate(image_files):
image_node = node_tree.nodes.new('CompositorNodeImage')
image_node.image = bpy.data.images.load(str(root / image_file.name))
crop_node = node_tree.nodes.new('CompositorNodeCrop')
crop_node.use_crop_size = True
left, top, right, bottom = image_file.stem.split('-')
actual_width = int(right) - int(left)
actual_height = int(bottom) - int(top)
if left == '0':
crop_node.min_x = 0
crop_node.max_x = actual_width
else:
crop_node.min_x = overscan
crop_node.max_x = actual_width + overscan
if top == '0':
crop_node.max_y = 0
crop_node.min_y = actual_height
else:
crop_node.max_y = overscan
crop_node.min_y = actual_height + overscan
if i == 0:
first_crop_node = crop_node
translate_node = node_tree.nodes.new('CompositorNodeTranslate')
# translate_node.use_relative = True
translate_node.inputs[1].default_value = float(left) + (actual_width - ${settings.resolution_x}) / 2
translate_node.inputs[2].default_value = float(top) + (actual_height - ${settings.resolution_y}) / 2
translate_nodes.append(translate_node)
separate_node = node_tree.nodes.new('CompositorNodeSeparateColor')
separate_nodes.append(separate_node)
node_tree.links.new(image_node.outputs[0], crop_node.inputs[0])
node_tree.links.new(crop_node.outputs[0], translate_node.inputs[0])
node_tree.links.new(translate_node.outputs[0], separate_node.inputs[0])
scale_node = node_tree.nodes.new('CompositorNodeScale')
scale_node.space = 'RELATIVE'
scale_node.inputs[1].default_value = ${settings.resolution_x} / min_width
scale_node.inputs[2].default_value = ${settings.resolution_y} / min_height
node_tree.links.new(first_crop_node.outputs[0], scale_node.inputs[0])
mix_node = node_tree.nodes.new('CompositorNodeMixRGB')
mix_node.blend_type = 'MIX'
mix_node.inputs[0].default_value = 0.0
mix_node.inputs[1].default_value = (0, 0, 0, 1)
node_tree.links.new(scale_node.outputs[0], mix_node.inputs[2])
mix_adds = [node_tree.nodes.new('CompositorNodeMixRGB') for _ in range(len(separate_nodes))]
math_adds = [node_tree.nodes.new('CompositorNodeMath') for _ in range(len(separate_nodes))]
for i, mix_add in enumerate(mix_adds):
mix_add.blend_type = 'ADD'
if i == 0:
node_tree.links.new(mix_node.outputs[0], mix_add.inputs[1])
else:
node_tree.links.new(mix_adds[i - 1].outputs[0], mix_add.inputs[1])
node_tree.links.new(translate_nodes[i].outputs[0], mix_add.inputs[2])
for i, math_add in enumerate(math_adds):
math_add.operation = 'ADD'
if i == 0:
node_tree.links.new(mix_node.outputs[0], math_add.inputs[0])
else:
node_tree.links.new(math_adds[i - 1].outputs[0], math_add.inputs[0])
node_tree.links.new(separate_nodes[i - 1].outputs[3], math_add.inputs[1])
set_alpha_node = node_tree.nodes.new('CompositorNodeSetAlpha')
set_alpha_node.mode = 'REPLACE_ALPHA'
node_tree.links.new(mix_adds[-1].outputs[0], set_alpha_node.inputs[0])
node_tree.links.new(math_adds[-1].outputs[0], set_alpha_node.inputs[1])
if feed_in_input is not None:
node_tree.links.new(set_alpha_node.outputs[0], feed_in_output)
else:
raise Exception('No Render Layers Node found. Currently only supported with a Render Layers Node in the Compositor.')
node_tree.nodes.remove(render_layers_node)
bpy.ops.render.render(write_still=True)`
const command = author.Command("blender-render", {
exe: "{blender}",
exeArgs: "{blenderArgs}",
argsBefore: [],
blendfile: settings.blendfile,
args: [
"--render-output", path.join(renderDir, path.basename(renderOutput), "merged"),
"--render-format", settings.format,
"--python-expr", pythonExpr
]
});
task.addCommand(command);
return task;
}

View File

@ -359,6 +359,18 @@ func (db *DB) FetchJob(ctx context.Context, jobUUID string) (*Job, error) {
if err != nil {
return nil, err
}
if sqlcJob.WorkerTagID.Valid {
workerTag, err := fetchWorkerTagByID(db.gormDB, uint(sqlcJob.WorkerTagID.Int64))
switch {
case errors.Is(err, sql.ErrNoRows):
return nil, ErrWorkerTagNotFound
case err != nil:
return nil, workerTagError(err, "fetching worker tag of job")
}
gormJob.WorkerTag = workerTag
}
return &gormJob, nil
}

View File

@ -7,7 +7,7 @@ import (
"math"
"time"
"gorm.io/gorm/clause"
"projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc"
)
// JobBlock keeps track of which Worker is not allowed to run which task type on which job.
@ -28,66 +28,76 @@ type JobBlock struct {
// AddWorkerToJobBlocklist prevents this Worker of getting any task, of this type, on this job, from the task scheduler.
func (db *DB) AddWorkerToJobBlocklist(ctx context.Context, job *Job, worker *Worker, taskType string) error {
entry := JobBlock{
Job: job,
Worker: worker,
if job.ID == 0 {
panic("Cannot add worker to job blocklist with zero job ID")
}
if worker.ID == 0 {
panic("Cannot add worker to job blocklist with zero worker ID")
}
if taskType == "" {
panic("Cannot add worker to job blocklist with empty task type")
}
queries, err := db.queries()
if err != nil {
return err
}
return queries.AddWorkerToJobBlocklist(ctx, sqlc.AddWorkerToJobBlocklistParams{
CreatedAt: db.now().Time,
JobID: int64(job.ID),
WorkerID: int64(worker.ID),
TaskType: taskType,
}
tx := db.gormDB.WithContext(ctx).
Clauses(clause.OnConflict{DoNothing: true}).
Create(&entry)
return tx.Error
})
}
// FetchJobBlocklist fetches the blocklist for the given job.
// Workers are fetched too, and embedded in the returned list.
func (db *DB) FetchJobBlocklist(ctx context.Context, jobUUID string) ([]JobBlock, error) {
entries := []JobBlock{}
queries, err := db.queries()
if err != nil {
return nil, err
}
tx := db.gormDB.WithContext(ctx).
Model(JobBlock{}).
Joins("inner join jobs on jobs.id = job_blocks.job_id").
Joins("Worker").
Where("jobs.uuid = ?", jobUUID).
Order("Worker.name").
Scan(&entries)
return entries, tx.Error
rows, err := queries.FetchJobBlocklist(ctx, jobUUID)
if err != nil {
return nil, err
}
entries := make([]JobBlock, len(rows))
for idx, row := range rows {
entries[idx].ID = uint(row.JobBlock.ID)
entries[idx].CreatedAt = row.JobBlock.CreatedAt
entries[idx].TaskType = row.JobBlock.TaskType
entries[idx].JobID = uint(row.JobBlock.JobID)
entries[idx].WorkerID = uint(row.JobBlock.WorkerID)
worker := convertSqlcWorker(row.Worker)
entries[idx].Worker = &worker
}
return entries, nil
}
// ClearJobBlocklist removes the entire blocklist of this job.
func (db *DB) ClearJobBlocklist(ctx context.Context, job *Job) error {
tx := db.gormDB.WithContext(ctx).
Where("job_id = ?", job.ID).
Delete(JobBlock{})
return tx.Error
queries, err := db.queries()
if err != nil {
return err
}
return queries.ClearJobBlocklist(ctx, job.UUID)
}
func (db *DB) RemoveFromJobBlocklist(ctx context.Context, jobUUID, workerUUID, taskType string) error {
// Find the job ID.
job := Job{}
tx := db.gormDB.WithContext(ctx).
Select("id").
Where("uuid = ?", jobUUID).
Find(&job)
if tx.Error != nil {
return jobError(tx.Error, "fetching job with uuid=%q", jobUUID)
queries, err := db.queries()
if err != nil {
return err
}
// Find the worker ID.
worker := Worker{}
tx = db.gormDB.WithContext(ctx).
Select("id").
Where("uuid = ?", workerUUID).
Find(&worker)
if tx.Error != nil {
return workerError(tx.Error, "fetching worker with uuid=%q", workerUUID)
}
// Remove the blocklist entry.
tx = db.gormDB.WithContext(ctx).
Where("job_id = ?", job.ID).
Where("worker_id = ?", worker.ID).
Where("task_type = ?", taskType).
Delete(JobBlock{})
return tx.Error
return queries.RemoveFromJobBlocklist(ctx, sqlc.RemoveFromJobBlocklistParams{
JobUUID: jobUUID,
WorkerUUID: workerUUID,
TaskType: taskType,
})
}
// WorkersLeftToRun returns a set of worker UUIDs that can run tasks of the given type on the given job.

View File

@ -75,6 +75,38 @@ func TestStoreAuthoredJobWithShamanCheckoutID(t *testing.T) {
assert.Equal(t, job.Storage.ShamanCheckoutID, fetchedJob.Storage.ShamanCheckoutID)
}
func TestStoreAuthoredJobWithWorkerTag(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(1 * time.Second)
defer cancel()
workerTagUUID := "daa811ac-6861-4004-8748-7700aebc244c"
require.NoError(t, db.CreateWorkerTag(ctx, &WorkerTag{
UUID: workerTagUUID,
Name: "🐈",
Description: "Mrieuw",
}))
workerTag, err := db.FetchWorkerTag(ctx, workerTagUUID)
require.NoError(t, err)
job := createTestAuthoredJobWithTasks()
job.WorkerTagUUID = workerTagUUID
err = db.StoreAuthoredJob(ctx, job)
require.NoError(t, err)
fetchedJob, err := db.FetchJob(ctx, job.JobID)
require.NoError(t, err)
require.NotNil(t, fetchedJob)
require.NotNil(t, fetchedJob.WorkerTagID)
assert.Equal(t, *fetchedJob.WorkerTagID, workerTag.ID)
require.NotNil(t, fetchedJob.WorkerTag)
assert.Equal(t, fetchedJob.WorkerTag.Name, workerTag.Name)
assert.Equal(t, fetchedJob.WorkerTag.Description, workerTag.Description)
assert.Equal(t, fetchedJob.WorkerTag.UUID, workerTagUUID)
}
func TestFetchTaskJobUUID(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(1 * time.Second)
defer cancel()

View File

@ -244,3 +244,28 @@ ON CONFLICT DO UPDATE
-- name: GetLastRenderedJobUUID :one
SELECT uuid FROM jobs
INNER JOIN last_rendereds LR ON jobs.id = LR.job_id;
-- name: AddWorkerToJobBlocklist :exec
-- Add a worker to a job's blocklist.
INSERT INTO job_blocks (created_at, job_id, worker_id, task_type)
VALUES (@created_at, @job_id, @worker_id, @task_type)
ON CONFLICT DO NOTHING;
-- name: FetchJobBlocklist :many
SELECT sqlc.embed(job_blocks), sqlc.embed(workers)
FROM job_blocks
INNER JOIN jobs ON jobs.id = job_blocks.job_id
INNER JOIN workers on workers.id = job_blocks.worker_id
WHERE jobs.uuid = @jobuuid
ORDER BY workers.name;
-- name: ClearJobBlocklist :exec
DELETE FROM job_blocks
WHERE job_id in (SELECT jobs.id FROM jobs WHERE jobs.uuid=@jobuuid);
-- name: RemoveFromJobBlocklist :exec
DELETE FROM job_blocks
WHERE
job_blocks.job_id in (SELECT jobs.id FROM jobs WHERE jobs.uuid=@jobuuid)
AND job_blocks.worker_id in (SELECT workers.id FROM workers WHERE workers.uuid=@workeruuid)
AND job_blocks.task_type = @task_type;

View File

@ -13,6 +13,30 @@ import (
"time"
)
const addWorkerToJobBlocklist = `-- name: AddWorkerToJobBlocklist :exec
INSERT INTO job_blocks (created_at, job_id, worker_id, task_type)
VALUES (?1, ?2, ?3, ?4)
ON CONFLICT DO NOTHING
`
type AddWorkerToJobBlocklistParams struct {
CreatedAt time.Time
JobID int64
WorkerID int64
TaskType string
}
// Add a worker to a job's blocklist.
func (q *Queries) AddWorkerToJobBlocklist(ctx context.Context, arg AddWorkerToJobBlocklistParams) error {
_, err := q.db.ExecContext(ctx, addWorkerToJobBlocklist,
arg.CreatedAt,
arg.JobID,
arg.WorkerID,
arg.TaskType,
)
return err
}
const addWorkerToTaskFailedList = `-- name: AddWorkerToTaskFailedList :exec
INSERT INTO task_failures (created_at, task_id, worker_id)
VALUES (?1, ?2, ?3)
@ -50,6 +74,16 @@ func (q *Queries) ClearFailureListOfTask(ctx context.Context, taskID int64) erro
return err
}
const clearJobBlocklist = `-- name: ClearJobBlocklist :exec
DELETE FROM job_blocks
WHERE job_id in (SELECT jobs.id FROM jobs WHERE jobs.uuid=?1)
`
func (q *Queries) ClearJobBlocklist(ctx context.Context, jobuuid string) error {
_, err := q.db.ExecContext(ctx, clearJobBlocklist, jobuuid)
return err
}
const countWorkersFailingTask = `-- name: CountWorkersFailingTask :one
SELECT count(*) as num_failed FROM task_failures
WHERE task_id=?1
@ -217,6 +251,65 @@ func (q *Queries) FetchJob(ctx context.Context, uuid string) (Job, error) {
return i, err
}
const fetchJobBlocklist = `-- name: FetchJobBlocklist :many
SELECT job_blocks.id, job_blocks.created_at, job_blocks.job_id, job_blocks.worker_id, job_blocks.task_type, workers.id, workers.created_at, workers.updated_at, workers.uuid, workers.secret, workers.name, workers.address, workers.platform, workers.software, workers.status, workers.last_seen_at, workers.status_requested, workers.lazy_status_request, workers.supported_task_types, workers.deleted_at, workers.can_restart
FROM job_blocks
INNER JOIN jobs ON jobs.id = job_blocks.job_id
INNER JOIN workers on workers.id = job_blocks.worker_id
WHERE jobs.uuid = ?1
ORDER BY workers.name
`
type FetchJobBlocklistRow struct {
JobBlock JobBlock
Worker Worker
}
func (q *Queries) FetchJobBlocklist(ctx context.Context, jobuuid string) ([]FetchJobBlocklistRow, error) {
rows, err := q.db.QueryContext(ctx, fetchJobBlocklist, jobuuid)
if err != nil {
return nil, err
}
defer rows.Close()
var items []FetchJobBlocklistRow
for rows.Next() {
var i FetchJobBlocklistRow
if err := rows.Scan(
&i.JobBlock.ID,
&i.JobBlock.CreatedAt,
&i.JobBlock.JobID,
&i.JobBlock.WorkerID,
&i.JobBlock.TaskType,
&i.Worker.ID,
&i.Worker.CreatedAt,
&i.Worker.UpdatedAt,
&i.Worker.UUID,
&i.Worker.Secret,
&i.Worker.Name,
&i.Worker.Address,
&i.Worker.Platform,
&i.Worker.Software,
&i.Worker.Status,
&i.Worker.LastSeenAt,
&i.Worker.StatusRequested,
&i.Worker.LazyStatusRequest,
&i.Worker.SupportedTaskTypes,
&i.Worker.DeletedAt,
&i.Worker.CanRestart,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const fetchJobByID = `-- name: FetchJobByID :one
SELECT id, created_at, updated_at, uuid, name, job_type, priority, status, activity, settings, metadata, delete_requested_at, storage_shaman_checkout_id, worker_tag_id FROM jobs
WHERE id = ? LIMIT 1
@ -758,6 +851,25 @@ func (q *Queries) JobCountTasksInStatus(ctx context.Context, arg JobCountTasksIn
return num_tasks, err
}
const removeFromJobBlocklist = `-- name: RemoveFromJobBlocklist :exec
DELETE FROM job_blocks
WHERE
job_blocks.job_id in (SELECT jobs.id FROM jobs WHERE jobs.uuid=?1)
AND job_blocks.worker_id in (SELECT workers.id FROM workers WHERE workers.uuid=?2)
AND job_blocks.task_type = ?3
`
type RemoveFromJobBlocklistParams struct {
JobUUID string
WorkerUUID string
TaskType string
}
func (q *Queries) RemoveFromJobBlocklist(ctx context.Context, arg RemoveFromJobBlocklistParams) error {
_, err := q.db.ExecContext(ctx, removeFromJobBlocklist, arg.JobUUID, arg.WorkerUUID, arg.TaskType)
return err
}
const requestJobDeletion = `-- name: RequestJobDeletion :exec
UPDATE jobs SET
updated_at = ?1,

View File

@ -53,6 +53,16 @@ func fetchWorkerTag(gormDB *gorm.DB, uuid string) (*WorkerTag, error) {
return &w, nil
}
// fetchWorkerTagByID fetches the worker tag using the given database instance.
func fetchWorkerTagByID(gormDB *gorm.DB, id uint) (*WorkerTag, error) {
w := WorkerTag{}
tx := gormDB.First(&w, "id = ?", id)
if tx.Error != nil {
return nil, workerTagError(tx.Error, "fetching worker tag")
}
return &w, nil
}
func (db *DB) SaveWorkerTag(ctx context.Context, tag *WorkerTag) error {
if err := db.gormDB.WithContext(ctx).Save(tag).Error; err != nil {
return workerTagError(err, "saving worker tag")

View File

@ -25,7 +25,7 @@ import (
"projects.blender.org/studio/flamenco/pkg/crosspath"
)
type CreateVideoParams struct {
type FramesToVideoParams struct {
exe string // Executable path defined by the Manager.
exeArgs string // Its CLI parameters defined by the Manager.
fps float64 // Frames per second of the video file.
@ -101,9 +101,9 @@ func (ce *CommandExecutor) cmdFramesToVideoExeCommand(
return execCmd, cleanup, nil
}
func cmdFramesToVideoParams(logger zerolog.Logger, cmd api.Command) (CreateVideoParams, error) {
func cmdFramesToVideoParams(logger zerolog.Logger, cmd api.Command) (FramesToVideoParams, error) {
var (
parameters CreateVideoParams
parameters FramesToVideoParams
ok bool
)
@ -172,7 +172,7 @@ func cmdFramesToVideoParams(logger zerolog.Logger, cmd api.Command) (CreateVideo
// getInputGlob constructs CLI arguments for FFmpeg input file globbing.
// The 2nd return value is a cleanup function.
func (p *CreateVideoParams) getInputGlob() ([]string, func(), error) {
func (p *FramesToVideoParams) getInputGlob() ([]string, func(), error) {
if runtime.GOOS == "windows" {
return createIndexFile(p.inputGlob, p.fps)
}

View File

@ -7,6 +7,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"projects.blender.org/studio/flamenco/pkg/api"
"projects.blender.org/studio/flamenco/pkg/shaman/filestore"
"projects.blender.org/studio/flamenco/pkg/shaman/testsupport"
@ -43,20 +44,33 @@ func TestCheckout(t *testing.T) {
assert.FileExists(t, filepath.Join(coPath, "httpstuff.py"))
assert.FileExists(t, filepath.Join(coPath, "много ликова.py"))
storePath := manager.fileStore.StoragePath()
assertLinksTo(t, filepath.Join(coPath, "subdir", "replacer.py"),
filepath.Join(storePath, "59", "0c148428d5c35fab3ebad2f3365bb469ab9c531b60831f3e826c472027a0b9", "3367.blob"))
assertLinksTo(t, filepath.Join(coPath, "feed.py"),
filepath.Join(storePath, "80", "b749c27b2fef7255e7e7b3c2029b03b31299c75ff1f1c72732081c70a713a3", "7488.blob"))
assertLinksTo(t, filepath.Join(coPath, "httpstuff.py"),
filepath.Join(storePath, "91", "4853599dd2c351ab7b82b219aae6e527e51518a667f0ff32244b0c94c75688", "486.blob"))
assertLinksTo(t, filepath.Join(coPath, "много ликова.py"),
filepath.Join(storePath, "d6", "fc7289b5196cc96748ea72f882a22c39b8833b457fe854ef4c03a01f5db0d3", "7217.blob"))
storePath, err := filepath.Rel(
manager.checkoutBasePath,
manager.fileStore.StoragePath(),
)
require.NoError(t, err)
assertLinksTo(t,
// Two extra '..' for 'á hausinn á þér/subdir'.
filepath.Join("..", "..", storePath, "59", "0c148428d5c35fab3ebad2f3365bb469ab9c531b60831f3e826c472027a0b9", "3367.blob"),
filepath.Join(coPath, "subdir", "replacer.py"),
)
assertLinksTo(t,
filepath.Join("..", storePath, "80", "b749c27b2fef7255e7e7b3c2029b03b31299c75ff1f1c72732081c70a713a3", "7488.blob"),
filepath.Join(coPath, "feed.py"),
)
assertLinksTo(t,
filepath.Join("..", storePath, "91", "4853599dd2c351ab7b82b219aae6e527e51518a667f0ff32244b0c94c75688", "486.blob"),
filepath.Join(coPath, "httpstuff.py"),
)
assertLinksTo(t,
filepath.Join("..", storePath, "d6", "fc7289b5196cc96748ea72f882a22c39b8833b457fe854ef4c03a01f5db0d3", "7217.blob"),
filepath.Join(coPath, "много ликова.py"),
)
}
func assertLinksTo(t *testing.T, linkPath, expectedTarget string) {
func assertLinksTo(t *testing.T, expectedTarget, linkPath string) {
actualTarget, err := os.Readlink(linkPath)
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, expectedTarget, actualTarget)
}

View File

@ -202,22 +202,46 @@ func (m *Manager) EraseCheckout(checkoutID string) error {
// It does *not* do any validation of the validity of the paths!
func (m *Manager) SymlinkToCheckout(blobPath, checkoutPath, symlinkRelativePath string) error {
symlinkPath := filepath.Join(checkoutPath, symlinkRelativePath)
logger := log.With().
Str("blobPath", blobPath).
Str("symlinkPath", symlinkPath).
Logger()
logger := log.With().Str("symlinkPath", symlinkPath).Logger()
blobPath, err := filepath.Abs(blobPath)
blobAbsolute, err := filepath.Abs(blobPath)
if err != nil {
logger.Error().Err(err).Msg("shaman: unable to make blobPath absolute")
logger.Error().
Str("blobPath", blobPath).
Err(err).Msg("shaman: unable to make blobPath absolute")
return err
}
if blobAbsolute != blobPath {
logger.Trace().
Str("input", blobPath).
Str("absolute", blobAbsolute).
Msg("shaman: made blobpath absolute")
}
// Try and make blobAbsolute relative to the symlink target directory, so that
// mounting the symlinked storage at a different prefix works as well.
symlinkDir := filepath.Dir(symlinkPath)
blobRelativeToCheckout, err := filepath.Rel(symlinkDir, blobAbsolute)
if err != nil {
logger.Warn().
Str("blobPath", blobAbsolute).
AnErr("cause", err).
Msg("shaman: unable to make blobPath relative, will use absolute path")
blobRelativeToCheckout = blobAbsolute
}
logger.Trace().
Str("absolute", blobAbsolute).
Str("relative", blobRelativeToCheckout).
Str("symlinkDir", symlinkDir).
Msg("shaman: made blobpath relative")
logger = logger.With().Str("blobPath", blobRelativeToCheckout).Logger()
logger.Debug().Msg("shaman: creating symlink")
// This is expected to fail sometimes, because we don't create parent directories yet.
// We only create those when we get a failure from symlinking.
err = os.Symlink(blobPath, symlinkPath)
err = os.Symlink(blobRelativeToCheckout, symlinkPath)
switch {
case err == nil:
return nil
@ -232,7 +256,7 @@ func (m *Manager) SymlinkToCheckout(blobPath, checkoutPath, symlinkRelativePath
Msg("shaman: unable to create symlink as it already exists, but also it cannot be read")
return err
}
if linkTarget != blobPath {
if linkTarget != blobRelativeToCheckout {
logger.Error().
AnErr("symlinkError", err).
Str("alreadyLinkedFrom", linkTarget).
@ -251,7 +275,7 @@ func (m *Manager) SymlinkToCheckout(blobPath, checkoutPath, symlinkRelativePath
logger.Error().Err(err).Msg("shaman: unable to create parent directory")
return err
}
if err := os.Symlink(blobPath, symlinkPath); err != nil {
if err := os.Symlink(blobRelativeToCheckout, symlinkPath); err != nil {
logger.Error().Err(err).Msg("shaman: unable to create symlink, after creating parent directory")
return err
}
@ -263,7 +287,7 @@ func (m *Manager) SymlinkToCheckout(blobPath, checkoutPath, symlinkRelativePath
// Change the modification time of the blob to mark it as 'referenced' just now.
m.wg.Add(1)
go func() {
if err := touchFile(blobPath); err != nil {
if err := touchFile(blobAbsolute); err != nil {
logger.Warn().Err(err).Msg("shaman: unable to touch blob path")
}
m.wg.Done()

View File

@ -24,15 +24,18 @@ package checkout
import (
"context"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
"time"
"github.com/mattn/go-colorable"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"projects.blender.org/studio/flamenco/pkg/api"
"projects.blender.org/studio/flamenco/pkg/shaman/config"
"projects.blender.org/studio/flamenco/pkg/shaman/filestore"
@ -40,6 +43,9 @@ import (
)
func createTestManager() (*Manager, func()) {
output := zerolog.ConsoleWriter{Out: colorable.NewColorableStdout(), TimeFormat: time.RFC3339}
log.Logger = log.Output(output)
conf, confCleanup := config.CreateTestConfig()
fileStore := filestore.New(conf)
manager := NewManager(conf, fileStore)
@ -53,36 +59,40 @@ func TestSymlinkToCheckout(t *testing.T) {
defer cleanup()
// Fake an older file.
blobPath := filepath.Join(manager.checkoutBasePath, "jemoeder.blob")
err := ioutil.WriteFile(blobPath, []byte("op je hoofd"), 0600)
assert.NoError(t, err)
blobPath := filepath.Join(manager.fileStore.StoragePath(), "opjehoofd.blob")
err := os.WriteFile(blobPath, []byte("op je hoofd"), 0600)
require.NoError(t, err)
wayBackWhen := time.Now().Add(-time.Hour * 24 * 100)
err = os.Chtimes(blobPath, wayBackWhen, wayBackWhen)
assert.NoError(t, err)
require.NoError(t, err)
symlinkRelativePath := "path/to/jemoeder.txt"
symlinkRelativePath := "path/to/opjehoofd.txt"
err = manager.SymlinkToCheckout(blobPath, manager.checkoutBasePath, symlinkRelativePath)
assert.NoError(t, err)
require.NoError(t, err)
err = manager.SymlinkToCheckout(blobPath, manager.checkoutBasePath, symlinkRelativePath)
assert.NoError(t, err, "symlinking a file twice should not be an issue")
require.NoError(t, err, "symlinking a file twice should not be an issue")
// Wait for touch() calls to be done.
// Wait for the manager to be done updating mtimes.
manager.wg.Wait()
// The blob should have been touched to indicate it was referenced just now.
stat, err := os.Stat(blobPath)
assert.NoError(t, err)
require.NoError(t, err)
assert.True(t,
stat.ModTime().After(wayBackWhen),
"File must be touched (%v must be later than %v)", stat.ModTime(), wayBackWhen)
symlinkPath := filepath.Join(manager.checkoutBasePath, symlinkRelativePath)
stat, err = os.Lstat(symlinkPath)
assert.NoError(t, err)
require.NoError(t, err)
assert.True(t, stat.Mode()&os.ModeType == os.ModeSymlink,
"%v should be a symlink", symlinkPath)
contents, err := os.ReadFile(symlinkPath)
require.NoError(t, err)
assert.Equal(t, string(contents), "op je hoofd")
}
func TestPrepareCheckout(t *testing.T) {

View File

@ -27,6 +27,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"projects.blender.org/studio/flamenco/pkg/api"
)
@ -44,7 +45,7 @@ func TestReportRequirements(t *testing.T) {
}
response, err := manager.ReportRequirements(context.Background(), required)
assert.NoError(t, err)
require.NoError(t, err)
// We should not be required to upload the same file twice, so the duplicate
// should not be in the response.

View File

@ -23,7 +23,6 @@
package shaman
import (
"errors"
"fmt"
"io/fs"
"os"
@ -31,8 +30,12 @@ import (
"testing"
"time"
"github.com/mattn/go-colorable"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"projects.blender.org/studio/flamenco/pkg/shaman/config"
"projects.blender.org/studio/flamenco/pkg/shaman/filestore"
"projects.blender.org/studio/flamenco/pkg/shaman/jwtauth"
@ -40,6 +43,9 @@ import (
)
func createTestShaman() (*Server, func()) {
output := zerolog.ConsoleWriter{Out: colorable.NewColorableStdout(), TimeFormat: time.RFC3339}
log.Logger = log.Output(output)
conf, confCleanup := config.CreateTestConfig()
shaman := NewServer(conf, jwtauth.AlwaysDeny{})
return shaman, confCleanup
@ -101,7 +107,7 @@ func TestGCFindOldFiles(t *testing.T) {
// Since all the links have just been created, nothing should be considered old.
ageThreshold := server.gcAgeThreshold()
old, err := server.gcFindOldFiles(ageThreshold, log.With().Str("test", "test").Logger())
assert.NoError(t, err)
require.NoError(t, err)
assert.EqualValues(t, mtimeMap{}, old)
// Make some files old, they should show up in a scan.
@ -111,7 +117,7 @@ func TestGCFindOldFiles(t *testing.T) {
makeOld(server, expectOld, "stored/dc/89f15de821ad1df3e78f8ef455e653a2d1862f2eb3f5ee78aa4ca68eb6fb35/781.blob")
old, err = server.gcFindOldFiles(ageThreshold, log.With().Str("package", "shaman/test").Logger())
assert.NoError(t, err)
require.NoError(t, err)
assert.EqualValues(t, expectOld, old)
}
@ -151,18 +157,18 @@ func TestGCComponents(t *testing.T) {
// No symlinks created yet, so this should report all the files in oldFiles.
oldFiles := copymap(expectOld)
err := server.gcFilterLinkedFiles(server.config.CheckoutPath(), oldFiles, log.With().Str("package", "shaman/test").Logger(), nil)
assert.NoError(t, err)
require.NoError(t, err)
assert.EqualValues(t, expectOld, oldFiles)
// Create some symlinks
checkoutInfo, err := server.checkoutMan.PrepareCheckout("checkoutID")
assert.NoError(t, err)
require.NoError(t, err)
err = server.checkoutMan.SymlinkToCheckout(absPaths["3367.blob"], server.config.CheckoutPath(),
filepath.Join(checkoutInfo.RelativePath, "use-of-3367.blob"))
assert.NoError(t, err)
require.NoError(t, err)
err = server.checkoutMan.SymlinkToCheckout(absPaths["781.blob"], extraCheckoutDir,
filepath.Join(checkoutInfo.RelativePath, "use-of-781.blob"))
assert.NoError(t, err)
require.NoError(t, err)
// There should only be two old file reported now.
expectRemovable := mtimeMap{
@ -173,17 +179,17 @@ func TestGCComponents(t *testing.T) {
stats := GCStats{}
err = server.gcFilterLinkedFiles(server.config.CheckoutPath(), oldFiles, log.With().Str("package", "shaman/test").Logger(), &stats)
assert.Equal(t, 1, stats.numSymlinksChecked) // 1 is in checkoutPath, the other in extraCheckoutDir
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, len(expectRemovable)+1, len(oldFiles)) // one file is linked from the extra checkout dir
err = server.gcFilterLinkedFiles(extraCheckoutDir, oldFiles, log.With().Str("package", "shaman/test").Logger(), &stats)
assert.Equal(t, 2, stats.numSymlinksChecked) // 1 is in checkoutPath, the other in extraCheckoutDir
assert.NoError(t, err)
require.NoError(t, err)
assert.EqualValues(t, expectRemovable, oldFiles)
// Touching a file before requesting deletion should not delete it.
now := time.Now()
err = os.Chtimes(absPaths["6001.blob"], now, now)
assert.NoError(t, err)
require.NoError(t, err)
// Running the garbage collector should only remove that one unused and untouched file.
assert.FileExists(t, absPaths["6001.blob"], "file should exist before GC")
@ -198,7 +204,7 @@ func TestGCComponents(t *testing.T) {
assert.FileExists(t, absPaths["6001.blob"], "file should exist after GC")
assert.FileExists(t, absPaths["781.blob"], "file should exist after GC")
_, err = os.Stat(absPaths["7488.blob"])
assert.True(t, errors.Is(err, fs.ErrNotExist), "file %s should NOT exist after GC", absPaths["7488.blob"])
assert.ErrorIs(t, err, fs.ErrNotExist, "file %s should NOT exist after GC", absPaths["7488.blob"])
}
// Test of the high-level GCStorage() function.
@ -228,13 +234,13 @@ func TestGarbageCollect(t *testing.T) {
// Create some symlinks
checkoutInfo, err := server.checkoutMan.PrepareCheckout("checkoutID")
assert.NoError(t, err)
require.NoError(t, err)
err = server.checkoutMan.SymlinkToCheckout(absPaths["3367.blob"], server.config.CheckoutPath(),
filepath.Join(checkoutInfo.RelativePath, "use-of-3367.blob"))
assert.NoError(t, err)
require.NoError(t, err)
err = server.checkoutMan.SymlinkToCheckout(absPaths["781.blob"], extraCheckoutDir,
filepath.Join(checkoutInfo.RelativePath, "use-of-781.blob"))
assert.NoError(t, err)
require.NoError(t, err)
// Running the garbage collector should only remove those two unused files.
assert.FileExists(t, absPaths["6001.blob"], "file should exist before GC")
@ -244,9 +250,9 @@ func TestGarbageCollect(t *testing.T) {
assert.FileExists(t, absPaths["7488.blob"], "file should exist after dry-run GC")
server.GCStorage(false)
_, err = os.Stat(absPaths["6001.blob"])
assert.True(t, errors.Is(err, fs.ErrNotExist), "file %s should NOT exist after GC", absPaths["6001.blob"])
assert.ErrorIs(t, err, fs.ErrNotExist, "file %s should NOT exist after GC", absPaths["6001.blob"])
_, err = os.Stat(absPaths["7488.blob"])
assert.True(t, errors.Is(err, fs.ErrNotExist), "file %s should NOT exist after GC", absPaths["7488.blob"])
assert.ErrorIs(t, err, fs.ErrNotExist, "file %s should NOT exist after GC", absPaths["7488.blob"])
// Used files should still exist.
assert.FileExists(t, absPaths["781.blob"])

View File

@ -23,7 +23,6 @@
package config
import (
"io/ioutil"
"os"
"path/filepath"
"time"
@ -31,7 +30,7 @@ import (
// CreateTestConfig creates a configuration + cleanup function.
func CreateTestConfig() (conf Config, cleanup func()) {
tempDir, err := ioutil.TempDir("", "shaman-test-")
tempDir, err := os.MkdirTemp("", "shaman-test-")
if err != nil {
panic(err)
}

View File

@ -33,6 +33,7 @@ import (
"projects.blender.org/studio/flamenco/pkg/shaman/hasher"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"projects.blender.org/studio/flamenco/pkg/shaman/filestore"
)
@ -79,14 +80,14 @@ func TestStoreFile(t *testing.T) {
// The correct checksum should be accepted.
err = testWithChecksum(correctChecksum, filesize)
assert.NoError(t, err)
require.NoError(t, err)
path, status = server.fileStore.ResolveFile(correctChecksum, filesize, filestore.ResolveEverything)
assert.Equal(t, filestore.StatusStored, status)
assert.FileExists(t, path)
savedContent, err := ioutil.ReadFile(path)
assert.NoError(t, err)
require.NoError(t, err)
assert.EqualValues(t, payload, savedContent, "The file should be saved uncompressed")
}

View File

@ -29,6 +29,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// mustCreateFile creates an empty file.
@ -105,17 +106,17 @@ func TestOpenForUpload(t *testing.T) {
fileSize := int64(len(contents))
file, err := store.OpenForUpload("abcdefxxx", fileSize)
assert.NoError(t, err)
require.NoError(t, err)
_, err = file.Write(contents)
assert.NoError(t, err)
assert.NoError(t, file.Close())
require.NoError(t, err)
require.NoError(t, file.Close())
foundPath, status := store.ResolveFile("abcdefxxx", fileSize, ResolveEverything)
assert.Equal(t, file.Name(), foundPath)
assert.Equal(t, StatusUploading, status)
readContents, err := ioutil.ReadFile(foundPath)
assert.NoError(t, err)
require.NoError(t, err)
assert.EqualValues(t, contents, readContents)
}
@ -130,14 +131,14 @@ func TestMoveToStored(t *testing.T) {
assert.Error(t, err)
file, err := store.OpenForUpload("abcdefxxx", fileSize)
assert.NoError(t, err)
require.NoError(t, err)
_, err = file.Write(contents)
assert.NoError(t, err)
assert.NoError(t, file.Close())
require.NoError(t, err)
require.NoError(t, file.Close())
tempLocation := file.Name()
err = store.MoveToStored("abcdefxxx", fileSize, file.Name())
assert.NoError(t, err, "moving file %s", file.Name())
require.NoError(t, err, "moving file %s", file.Name())
foundPath, status := store.ResolveFile("abcdefxxx", fileSize, ResolveEverything)
assert.NotEqual(t, file.Name(), foundPath)

View File

@ -29,6 +29,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestStoragePrefix(t *testing.T) {
@ -61,7 +62,7 @@ func TestFilePermissions(t *testing.T) {
t.SkipNow()
}
dirname, err := os.MkdirTemp("", "file-permission-test")
assert.NoError(t, err)
require.NoError(t, err)
defer os.RemoveAll(dirname)
bin := storageBin{
@ -71,11 +72,11 @@ func TestFilePermissions(t *testing.T) {
}
file, err := bin.openForWriting("testfilename.blend")
assert.NoError(t, err)
require.NoError(t, err)
defer file.Close()
filestat, err := file.Stat()
assert.NoError(t, err)
require.NoError(t, err)
// The exact permissions depend on the current (unittest) process umask. This
// umask is not easy to get, which is why we have a copy of `tempfile.go` in

View File

@ -29,6 +29,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestTouch(t *testing.T) {
@ -46,7 +47,7 @@ func TestTouch(t *testing.T) {
assert.Nil(t, Touch(testPath))
stat, err := os.Stat(testPath)
assert.NoError(t, err)
require.NoError(t, err)
threshold := time.Now().Add(-5 * time.Second)
assert.True(t, stat.ModTime().After(threshold),

View File

@ -34,16 +34,6 @@
<span @click="copyElementText" class="click-to-copy">{{ jobData.id }}</span>
</dd>
<template v-if="workerTag">
<!-- TODO: fetch tag name and show that instead, and allow editing of the tag. -->
<dt class="field-name" title="Worker Tag">Tag</dt>
<dd :title="workerTag.description">
<span @click="copyElementData" class="click-to-copy" :data-clipboard="workerTag.id">{{
workerTag.name
}}</span>
</dd>
</template>
<dt class="field-name" title="Name">Name</dt>
<dd>{{ jobData.name }}</dd>
@ -52,9 +42,15 @@
{{ jobData.status }}
</dd>
<dt class="field-type" title="Type">Type</dt>
<dt class="field-type" title="Job Type">Job Type</dt>
<dd>{{ jobType ? jobType.label : jobData.type }}</dd>
<dt class="field-worker-tag" title="Worker Tag">Worker Tag</dt>
<dd v-if="workerTag" :title="workerTag.description">
{{ workerTag.name }}
</dd>
<dd v-else class="no-worker-tag">All Workers</dd>
<dt class="field-priority" title="Priority">Priority</dt>
<dd>
<PopoverEditableJobPriority :jobId="jobData.id" :priority="jobData.priority" />
@ -289,4 +285,8 @@ export default {
color: var(--indicator-color);
font-weight: bold;
}
dd.no-worker-tag {
color: var(--color-text-muted);
}
</style>

View File

@ -38,7 +38,7 @@ software. Development is supported by the Blender project.
### Simple and Portable
Flamenco consists of a few components and requires almost no configuration
Flamenco consists of a few components and requires almost no configuration in order
to be used in production.
{{< /columns >}}
@ -48,13 +48,13 @@ to be used in production.
### Easy Customization
Designed to be customizable, Flamenco allows TDs to specify
Job Types using the JavaScript language and seamlessly fit into the pipeline.
Job Types are specified using the JavaScript language and seamlessly fit into a pipeline.
<--->
### Cross-platform and Self-hosted
Flamenco runs on all major operating system, and is fully hosted on your own hardware.
Flamenco runs on all major operating systems, and is self hostable.
Your data is yours, and yours alone.
{{< /columns >}}
@ -63,7 +63,7 @@ Your data is yours, and yours alone.
### Robust Technology
The core of Flamenco is build using Go and SQLite. Check out the sources on
The core of Flamenco is built using Go and SQLite. Check out the source code on
[projects.blender.org](https://projects.blender.org/studio/flamenco).
<--->
@ -71,6 +71,6 @@ The core of Flamenco is build using Go and SQLite. Check out the sources on
### In Development
Flamenco v3 is in active development at Blender Studio. Join
[the chat](https://blender.chat/channel/flamenco) to see what's happening!
[the chat](https://chat.blender.org/#/room/#flamenco:blender.org) to see what's happening!
{{< /columns >}}

View File

@ -15,7 +15,7 @@ Join the community on the [#flamenco channel][chat] of Blender Chat to discuss
development topics. New faces are always welcome! Also, make sure you check out
the [quickstart guide](/usage/quickstart/).
[chat]: https://blender.chat/channel/flamenco
[chat]: https://chat.blender.org/#/room/#flamenco:blender.org
<--->

View File

@ -224,3 +224,13 @@ well, and thus `\` becomes `\\`.
In other words, even though it looks strange, this is not a bug in Flamenco. The
aim is to prevent you from seeing these doublings as little as possible, but
unfortunately it cannot always be avoided.
### Assets are missing!
When your blend file references your assets (textures, linked blend files, etc.)
with an absolute path, **Flamenco assumes that this path is valid for all
Workers, and will not copy those assets to the shared storage.** This makes it
possible to store large files, like simulation caches, on the shared storage,
without Flamenco creating a copy for each render job.
Read more on this in [Absolute vs. Relative Paths](/usage/shared-storage/#absolute-vs-relative-paths).

View File

@ -29,5 +29,5 @@ directory if necessary. Then restart Flamenco Manager and in Blender press the
[jobtypes]: {{< ref "usage/job-types" >}}
[built-in-scripts]: https://projects.blender.org/studio/flamenco/src/branch/main/internal/manager/job_compilers/scripts
[flamencochannel]: https://blender.chat/channel/flamenco
[flamencochannel]: https://chat.blender.org/#/room/#flamenco:blender.org
[tracker]: https://projects.blender.org/studio/flamenco/issues/new?template=.gitea%2fissue_template%2fjobtype.yaml

View File

@ -0,0 +1,40 @@
---
title: Built-in Job Types
weight: 10
---
Flamenco comes with built-in job types that are used for most common tasks. Currently, there are two of them:
- Simple Blender Render
- Single Image Render
## Simple Blender Render
This built-in job type is used for rendering a sequence of frames from a single Blender file, and potentially creating a preview video for compatible formats using FFmpeg. This job type is suitable for straightforward rendering tasks where one needs to render a range of frames and potentially compile them into a video. Note that this job type does not render into video formats directly, so the output format should be FFmpeg-compatible image formats.
The job type defines several settings that can be configured when submitting a job:
- `Frames` _string, required_: The frame range to render, e.g. '47', '1-30', '3, 5-10, 47-327'. It could also be set to use scene range or automatically determined on submission.
- `Chunk Size` _integer, default: 1_: Number of frames to render in one Blender render task.
- `Render Output Root` _string, required_: Base directory where render output is stored. Job-specific parts will be appended to this path.
- `Add Path Components` _integer, required, default: 0_: Number of path components from the current blend file to use in the render output path.
- `Render Output Path` _non-editable_: Final file path where render output will be saved. This is a computed value based on the `Render Output Root` and `Add Path Components` settings.
By using this job type, you can easily distribute Blender rendering tasks across multiple workers in your Flamenco setup, potentially saving significant time on large rendering projects.
## Single Image Render
This built-in job type is designed for distributed rendering of a single image from a Blender file. It splits the image into tiles, renders each tile separately, and then merges the tiles back into a single image. This approach allows for parallel processing of different parts of the image, potentially speeding up the rendering process.
Currently, the job type supports composition, as long as there is one single `Render Layers` node. The job type does not support `Denoising` node.
The job type defines several settings that can be configured when submitting a job:
- `Tile Size X` _integer, default: 64: Tile size in pixels for the X axis, does not need to be divisible by the image width.
- `Tile Size Y` _integer, default: 64: Tile size in pixels for the Y axis, does not need to be divisible by the image height.
- `Frame` _integer, required_: The frame to render. By default, it uses the current frame in the Blender scene.
- `Render Output Root` _string, required_: Base directory where render output is stored. Job-specific parts will be appended to this path.
- `Add Path Components` _integer, required, default: 0_: Number of path components from the current blend file to use in the render output path.
- `Render Output Path` _non-editable_: Final file path where render output will be saved. This is a computed value based on the `Render Output Root` and `Add Path Components` settings.
Choosing the right tile size is crucial for performance. Too small tiles might increase overhead, while too large tiles might not distribute the workload effectively.

View File

@ -26,7 +26,7 @@ The constructed CLI invocation will be `{exe} {exeArgs} {argsBefore} {blendfile}
Flamenco Worker monitors the logging of Blender; lines like `Saved: filename.jpg` are recognised and sent as preview images to Flamenco Manager.
## FFmpeg: `create-video`
## FFmpeg: `frames-to-video`
Uses FFmpeg to convert an image sequence to a video file.

View File

@ -92,3 +92,26 @@ have arrived on a specific worker, without waiting for *all* syncing to be
completed (as someone may have just submitted another job).
[jobtypes]: {{< ref "/usage/job-types" >}}
## Absolute vs. Relative Paths
Blender can reference assets (textures, linked blend files, etc.) in two ways:
- by **relative path**, like `//textures\my-favourite-brick.exr`, which is relative to the blend file, or
- by **absolute path**, like `D:\texture-library\my-favourite-brick.exr`, which is the full path of the file.
When an asset is referenced by an absolute path, **Flamenco assumes that this
path is valid for all Workers, and will not copy those assets to the shared
storage.** This makes it possible to store large files, like simulation caches,
on the shared storage, without Flamenco creating a copy for each render job.
{{< hint type=Warning >}} On Windows it is not possible to construct a relative
path to an asset when that asset is no a different drive than the main blend
file. If you still want Flamenco to copy such assets, there are two workarounds:
- Move your asset libraries to the same drive as your Blender projects.
- Use [symbolic links][symlinks-guide-windows] to make your assets available at
a suitable path.
[symlinks-guide-windows]: https://www.howtogeek.com/16226/complete-guide-to-symbolic-links-symlinks-on-windows-or-linux/
{{< /hint >}}