Support pausing jobs #104313

Manually merged
Sybren A. Stüvel merged 28 commits from David-Zhang-10/flamenco:paused-job-status into main 2024-07-01 17:53:44 +02:00
31 changed files with 1144 additions and 375 deletions
Showing only changes of commit 8e1f481bf1 - Show all commits

View File

@ -8,6 +8,9 @@ bugs in actually-released versions.
- Add `label` to job settings, to have full control over how they are presented in Blender's job submission GUI. If a job setting does not define a label, its `key` is used to generate one (like Flamenco 3.5 and older).
- Add `shellSplit(someString)` function to the job compiler scripts. This splits a string into an array of strings using shell/CLI semantics.
- Make it possible to script job submissions in Blender, by executing the `bpy.ops.flamenco.submit_job(job_name="jobname")` operator.
- Security updates of some deendencies:
- [GO-2024-2937: Parsing a corrupt or malicious image with invalid color indices can cause a panic](https://pkg.go.dev/vuln/GO-2024-2937)
## 3.5 - released 2024-04-16

View File

@ -4,7 +4,7 @@ PKG := projects.blender.org/studio/flamenco
# To update the version number in all the relevant places, update the VERSION
# variable below and run `make update-version`.
VERSION := 3.6-alpha0
VERSION := 3.6-alpha3
# "alpha", "beta", or "release".
RELEASE_CYCLE := alpha
@ -355,6 +355,7 @@ release-package:
$(MAKE) -s release-package-linux
$(MAKE) -s release-package-darwin
$(MAKE) -s release-package-windows
$(MAKE) -s clean
.PHONY: release-package-linux
release-package-linux:

View File

@ -12,7 +12,7 @@ bl_info = {
"doc_url": "https://flamenco.blender.org/",
"category": "System",
"support": "COMMUNITY",
"warning": "This is version 3.6-alpha0 of the add-on, which is not a stable release",
"warning": "This is version 3.6-alpha3 of the add-on, which is not a stable release",
}
from pathlib import Path

View File

@ -180,9 +180,9 @@ class PackThread(threading.Thread):
def poll(self, timeout: Optional[int] = None) -> Optional[Message]:
"""Poll the queue, return the first message or None if there is none.
:param timeout: Max time to wait for a message to appear on the queue.
If None, will not wait and just return None immediately (if there is
no queued message).
:param timeout: Max time to wait for a message to appear on the queue,
in seconds. If None, will not wait and just return None immediately
(if there is no queued message).
"""
try:
return self.queue.get(block=timeout is not None, timeout=timeout)

View File

@ -382,10 +382,12 @@ def _job_type_to_class_name(job_type_name: str) -> str:
def _job_setting_label(setting: _AvailableJobSetting) -> str:
"""Return a suitable label for this job setting."""
label = setting.get("label", default="")
label = str(setting.get("label", default=""))
if label:
return label
return setting.key.title().replace("_", " ")
generated_label: str = setting.key.title().replace("_", " ")
return generated_label
def _set_if_available(

View File

@ -10,7 +10,7 @@
"""
__version__ = "3.6-alpha0"
__version__ = "3.6-alpha3"
# import ApiClient
from flamenco.manager.api_client import ApiClient

View File

@ -76,7 +76,7 @@ class ApiClient(object):
self.default_headers[header_name] = header_value
self.cookie = cookie
# Set default User-Agent.
self.user_agent = 'Flamenco/3.6-alpha0 (Blender add-on)'
self.user_agent = 'Flamenco/3.6-alpha3 (Blender add-on)'
def __enter__(self):
return self

View File

@ -404,7 +404,7 @@ conf = flamenco.manager.Configuration(
"OS: {env}\n"\
"Python Version: {pyversion}\n"\
"Version of the API: 1.0.0\n"\
"SDK Package Version: 3.6-alpha0".\
"SDK Package Version: 3.6-alpha3".\
format(env=sys.platform, pyversion=sys.version)
def get_host_settings(self):

View File

@ -4,7 +4,7 @@ Render Farm manager API
The `flamenco.manager` package is automatically generated by the [OpenAPI Generator](https://openapi-generator.tech) project:
- API version: 1.0.0
- Package version: 3.6-alpha0
- Package version: 3.6-alpha3
- Build package: org.openapitools.codegen.languages.PythonClientCodegen
For more information, please visit [https://flamenco.blender.org/](https://flamenco.blender.org/)

View File

@ -128,31 +128,50 @@ class FLAMENCO_OT_submit_job(FlamencoOpMixin, bpy.types.Operator):
job_type = job_types.active_job_type(context.scene)
return job_type is not None
def execute(self, context: bpy.types.Context) -> set[str]:
filepath, ok = self._presubmit_check(context)
if not ok:
return {"CANCELLED"}
is_running = self._submit_files(context, filepath)
if not is_running:
return {"CANCELLED"}
if self.packthread is None:
# If there is no pack thread running, there isn't much we can do.
self.report({"ERROR"}, "No pack thread running, please report a bug")
self._quit(context)
return {"CANCELLED"}
# Keep handling messages from the background thread.
while True:
# Block for 5 seconds at a time. The exact duration doesn't matter,
# as this while-loop is blocking the main thread anyway.
msg = self.packthread.poll(timeout=5)
if not msg:
# No message received, is fine, just wait for another one.
continue
result = self._on_bat_pack_msg(context, msg)
if "RUNNING_MODAL" not in result:
break
self._quit(context)
self.packthread.join(timeout=5)
return {"FINISHED"}
def invoke(self, context: bpy.types.Context, event: bpy.types.Event) -> set[str]:
# Before doing anything, make sure the info we cached about the Manager
# is up to date. A change in job storage directory on the Manager can
# cause nasty error messages when we submit, and it's better to just be
# ahead of the curve and refresh first. This also allows for checking
# the actual Manager version before submitting.
err = self._check_manager(context)
if err:
self.report({"WARNING"}, err)
filepath, ok = self._presubmit_check(context)
if not ok:
return {"CANCELLED"}
if not context.blend_data.filepath:
# The file path needs to be known before the file can be submitted.
self.report(
{"ERROR"}, "Please save your .blend file before submitting to Flamenco"
)
is_running = self._submit_files(context, filepath)
if not is_running:
return {"CANCELLED"}
filepath = self._save_blendfile(context)
# Check the job with the Manager, to see if it would be accepted.
if not self._check_job(context):
return {"CANCELLED"}
return self._submit_files(context, filepath)
context.window_manager.modal_handler_add(self)
return {"RUNNING_MODAL"}
def modal(self, context: bpy.types.Context, event: bpy.types.Event) -> set[str]:
# This function is called for TIMER events to poll the BAT pack thread.
@ -244,6 +263,39 @@ class FLAMENCO_OT_submit_job(FlamencoOpMixin, bpy.types.Operator):
return None
return manager
def _presubmit_check(self, context: bpy.types.Context) -> tuple[Path, bool]:
"""Do a pre-submission check, returning whether submission can continue.
Reports warnings when returning False, so the caller can just abort.
Returns a tuple (can_submit, filepath_to_submit)
"""
# Before doing anything, make sure the info we cached about the Manager
# is up to date. A change in job storage directory on the Manager can
# cause nasty error messages when we submit, and it's better to just be
# ahead of the curve and refresh first. This also allows for checking
# the actual Manager version before submitting.
err = self._check_manager(context)
if err:
self.report({"WARNING"}, err)
return Path(), False
if not context.blend_data.filepath:
# The file path needs to be known before the file can be submitted.
self.report(
{"ERROR"}, "Please save your .blend file before submitting to Flamenco"
)
return Path(), False
filepath = self._save_blendfile(context)
# Check the job with the Manager, to see if it would be accepted.
if not self._check_job(context):
return Path(), False
return filepath, True
def _save_blendfile(self, context):
"""Save to a different file, specifically for Flamenco.
@ -303,19 +355,22 @@ class FLAMENCO_OT_submit_job(FlamencoOpMixin, bpy.types.Operator):
return filepath
def _submit_files(self, context: bpy.types.Context, blendfile: Path) -> set[str]:
"""Ensure that the files are somewhere in the shared storage."""
def _submit_files(self, context: bpy.types.Context, blendfile: Path) -> bool:
"""Ensure that the files are somewhere in the shared storage.
Returns True if a packing thread has been started, and False otherwise.
"""
from .bat import interface as bat_interface
if bat_interface.is_packing():
self.report({"ERROR"}, "Another packing operation is running")
self._quit(context)
return {"CANCELLED"}
return False
manager = self._manager_info(context)
if not manager:
return {"CANCELLED"}
return False
if manager.shared_storage.shaman_enabled:
# self.blendfile_on_farm will be set when BAT created the checkout,
@ -335,13 +390,12 @@ class FLAMENCO_OT_submit_job(FlamencoOpMixin, bpy.types.Operator):
self.blendfile_on_farm = self._bat_pack_filesystem(context, blendfile)
except FileNotFoundError:
self._quit(context)
return {"CANCELLED"}
return False
context.window_manager.modal_handler_add(self)
wm = context.window_manager
self.timer = wm.event_timer_add(self.TIMER_PERIOD, window=context.window)
return {"RUNNING_MODAL"}
return True
def _bat_pack_filesystem(
self, context: bpy.types.Context, blendfile: Path

17
go.mod
View File

@ -28,10 +28,10 @@ require (
github.com/stretchr/testify v1.8.4
github.com/zcalusic/sysinfo v1.0.1
github.com/ziflex/lecho/v3 v3.1.0
golang.org/x/crypto v0.21.0
golang.org/x/image v0.10.0
golang.org/x/net v0.23.0
golang.org/x/sys v0.18.0
golang.org/x/crypto v0.23.0
golang.org/x/image v0.18.0
golang.org/x/net v0.25.0
golang.org/x/sys v0.20.0
gopkg.in/yaml.v2 v2.4.0
gorm.io/gorm v1.25.5
modernc.org/sqlite v1.28.0
@ -59,12 +59,11 @@ require (
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.1 // indirect
golang.org/toolchain v0.0.1-go1.9rc2.windows-amd64 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
golang.org/x/tools v0.16.1 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/uint128 v1.3.0 // indirect
modernc.org/cc/v3 v3.41.0 // indirect

44
go.sum
View File

@ -74,8 +74,8 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219 h1:utua3L2IbQJmauC5IXdEA547bcoU5dozgQAfc8Onsg4=
github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/pprof v0.0.0-20230207041349-798e818bf904 h1:4/hN5RUoecvl+RmJRE2YxKWtnnQls6rQjjW5oV7qg2U=
github.com/google/pprof v0.0.0-20230207041349-798e818bf904/go.mod h1:uglQLonpP8qtYCYyzA+8c/9qtqgA3qsXGYqCPKARAFg=
@ -201,18 +201,17 @@ golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/image v0.10.0 h1:gXjUUtwtx5yOE0VKWq1CH4IJAClq4UGgUA3i+rpON9M=
golang.org/x/image v0.10.0/go.mod h1:jtrku+n79PfroUbvDdeUWMAI+heR786BofxrbiSF+J0=
golang.org/x/image v0.18.0 h1:jGzIakQa/ZXI1I0Fxvaa9W7yP25TqT6cHIHn+6CqvSQ=
golang.org/x/image v0.18.0/go.mod h1:4yyo5vMFQjVjUcVk4jEQcU9MGy/rulF5WvUILseCM2E=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0=
golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA=
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
@ -222,17 +221,15 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210913180222-943fd674d43e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -260,24 +257,20 @@ golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba h1:O8mE0/t419eoIwhTFpKVkHiTs/Igowgfkj25AcZrtiE=
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@ -288,9 +281,8 @@ golang.org/x/tools v0.0.0-20210114065538-d78b04bdf963/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.16.1 h1:TLyB3WofjdOEepBHAU20JdNC1Zbg87elYofWYAY5oZA=
golang.org/x/tools v0.16.1/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

View File

@ -12,6 +12,7 @@ import (
"time"
"github.com/labstack/echo/v4"
"github.com/rs/zerolog"
"projects.blender.org/studio/flamenco/pkg/api"
)
@ -134,3 +135,12 @@ func sendAPIErrorDBBusy(e echo.Context, message string, args ...interface{}) err
e.Response().Header().Set("Retry-After", strconv.FormatInt(seconds, 10))
return e.JSON(code, apiErr)
}
// handleConnectionClosed logs a message and sends a "418 I'm a teapot" response
// to the HTTP client. The response is likely to be seen, as the connection was
// closed. But just in case this function was called by mistake, it's a response
// code that is unlikely to be accepted by the client.
func handleConnectionClosed(e echo.Context, logger zerolog.Logger, logMessage string) error {
logger.Debug().Msg(logMessage)
return e.NoContent(http.StatusTeapot)
}

View File

@ -3,6 +3,7 @@ package api_impl
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"errors"
"net/http"
@ -16,7 +17,10 @@ import (
func (f *Flamenco) FetchWorkers(e echo.Context) error {
logger := requestLogger(e)
dbWorkers, err := f.persist.FetchWorkers(e.Request().Context())
if err != nil {
switch {
case errors.Is(err, context.Canceled):
return handleConnectionClosed(e, logger, "fetching all workers")
case err != nil:
logger.Error().Err(err).Msg("fetching all workers")
return sendAPIError(e, http.StatusInternalServerError, "error fetching workers: %v", err)
}
@ -42,18 +46,23 @@ func (f *Flamenco) FetchWorker(e echo.Context, workerUUID string) error {
ctx := e.Request().Context()
dbWorker, err := f.persist.FetchWorker(ctx, workerUUID)
if errors.Is(err, persistence.ErrWorkerNotFound) {
switch {
case errors.Is(err, persistence.ErrWorkerNotFound):
logger.Debug().Msg("non-existent worker requested")
return sendAPIError(e, http.StatusNotFound, "worker %q not found", workerUUID)
}
if err != nil {
case errors.Is(err, context.Canceled):
return handleConnectionClosed(e, logger, "fetching task assigned to worker")
case err != nil:
logger.Error().Err(err).Msg("fetching worker")
return sendAPIError(e, http.StatusInternalServerError, "error fetching worker: %v", err)
}
dbTask, err := f.persist.FetchWorkerTask(ctx, dbWorker)
if err != nil {
logger.Error().Err(err).Msg("fetching task assigned to worker")
switch {
case errors.Is(err, context.Canceled):
return handleConnectionClosed(e, logger, "fetching task assigned to worker")
case err != nil:
logger.Error().AnErr("cause", err).Msg("fetching task assigned to worker")
return sendAPIError(e, http.StatusInternalServerError, "error fetching task assigned to worker: %v", err)
}
@ -86,11 +95,11 @@ func (f *Flamenco) DeleteWorker(e echo.Context, workerUUID string) error {
// Fetch the worker in order to re-queue its tasks.
worker, err := f.persist.FetchWorker(ctx, workerUUID)
if errors.Is(err, persistence.ErrWorkerNotFound) {
switch {
case errors.Is(err, persistence.ErrWorkerNotFound):
logger.Debug().Msg("deletion of non-existent worker requested")
return sendAPIError(e, http.StatusNotFound, "worker %q not found", workerUUID)
}
if err != nil {
case err != nil:
logger.Error().Err(err).Msg("fetching worker for deletion")
return sendAPIError(e, http.StatusInternalServerError,
"error fetching worker for deletion: %v", err)
@ -105,11 +114,11 @@ func (f *Flamenco) DeleteWorker(e echo.Context, workerUUID string) error {
// Actually delete the worker.
err = f.persist.DeleteWorker(ctx, workerUUID)
if errors.Is(err, persistence.ErrWorkerNotFound) {
switch {
case errors.Is(err, persistence.ErrWorkerNotFound):
logger.Debug().Msg("deletion of non-existent worker requested")
return sendAPIError(e, http.StatusNotFound, "worker %q not found", workerUUID)
}
if err != nil {
case err != nil:
logger.Error().Err(err).Msg("deleting worker")
return sendAPIError(e, http.StatusInternalServerError, "error deleting worker: %v", err)
}
@ -145,11 +154,13 @@ func (f *Flamenco) RequestWorkerStatusChange(e echo.Context, workerUUID string)
// Fetch the worker.
dbWorker, err := f.persist.FetchWorker(e.Request().Context(), workerUUID)
if errors.Is(err, persistence.ErrWorkerNotFound) {
switch {
case errors.Is(err, context.Canceled):
return handleConnectionClosed(e, logger, "fetching worker")
case errors.Is(err, persistence.ErrWorkerNotFound):
logger.Debug().Msg("non-existent worker requested")
return sendAPIError(e, http.StatusNotFound, "worker %q not found", workerUUID)
}
if err != nil {
case err != nil:
logger.Error().Err(err).Msg("fetching worker")
return sendAPIError(e, http.StatusInternalServerError, "error fetching worker: %v", err)
}
@ -168,6 +179,11 @@ func (f *Flamenco) RequestWorkerStatusChange(e echo.Context, workerUUID string)
logger.Info().Msg("worker status change requested")
// All information to do the operation is known, so even when the client
// disconnects, the work should be completed.
ctx, ctxCancel := bgContext()
defer ctxCancel()
if dbWorker.Status == change.Status {
// Requesting that the worker should go to its current status basically
// means cancelling any previous status change request.
@ -177,7 +193,7 @@ func (f *Flamenco) RequestWorkerStatusChange(e echo.Context, workerUUID string)
}
// Store the status change.
if err := f.persist.SaveWorker(e.Request().Context(), dbWorker); err != nil {
if err := f.persist.SaveWorker(ctx, dbWorker); err != nil {
logger.Error().Err(err).Msg("saving worker after status change request")
return sendAPIError(e, http.StatusInternalServerError, "error saving worker: %v", err)
}
@ -221,6 +237,11 @@ func (f *Flamenco) SetWorkerTags(e echo.Context, workerUUID string) error {
Logger()
logger.Info().Msg("worker tag change requested")
// All information to do the operation is known, so even when the client
// disconnects, the work should be completed.
ctx, ctxCancel := bgContext()
defer ctxCancel()
// Store the new tag assignment.
if err := f.persist.WorkerSetTags(ctx, dbWorker, change.TagIds); err != nil {
logger.Error().Err(err).Msg("saving worker after tag change request")

View File

@ -517,6 +517,23 @@ func (c *Conf) GetTwoWayVariables(audience VariableAudience, platform VariablePl
return twoWayVars
}
// GetOneWayVariables returns the regular (one-way) variable values for this (audience,
// platform) combination. If no variables are found, just returns an empty map.
// If a value is defined for both the "all" platform and specifically the given
// platform, the specific platform definition wins.
func (c *Conf) GetOneWayVariables(audience VariableAudience, platform VariablePlatform) map[string]string {
varsForPlatform := c.getVariables(audience, platform)
// Only keep the two-way variables.
oneWayVars := map[string]string{}
for varname, value := range varsForPlatform {
if !c.isTwoWay(varname) {
oneWayVars[varname] = value
}
}
return oneWayVars
}
// ResolveVariables returns the variables for this (audience, platform) combination.
// If no variables are found, just returns an empty map. If a value is defined
// for both the "all" platform and specifically the given platform, the specific

View File

@ -39,17 +39,8 @@ func (c *Conf) NewVariableToValueConverter(audience VariableAudience, platform V
// NewVariableExpander returns a new VariableExpander for the given audience & platform.
func (c *Conf) NewVariableExpander(audience VariableAudience, platform VariablePlatform) *VariableExpander {
// Get the variables for the given audience & platform.
varsForPlatform := c.getVariables(audience, platform)
if len(varsForPlatform) == 0 {
log.Warn().
Str("audience", string(audience)).
Str("platform", string(platform)).
Msg("no variables defined for this platform given this audience")
}
return &VariableExpander{
oneWayVars: varsForPlatform,
oneWayVars: c.GetOneWayVariables(audience, platform),
managerTwoWayVars: c.GetTwoWayVariables(audience, c.currentGOOS),
targetTwoWayVars: c.GetTwoWayVariables(audience, platform),
targetPlatform: platform,
@ -89,15 +80,16 @@ func isValueMatch(valueToMatch, variableValue string) bool {
return true
}
// If the variable value has a backslash, assume it is a Windows path.
// If any of the values have backslashes, assume it is a Windows path.
// Convert it to slash notation just to see if that would provide a
// match.
if strings.ContainsRune(variableValue, '\\') {
slashedValue := crosspath.ToSlash(variableValue)
return strings.HasPrefix(valueToMatch, slashedValue)
variableValue = crosspath.ToSlash(variableValue)
}
return false
if strings.ContainsRune(valueToMatch, '\\') {
valueToMatch = crosspath.ToSlash(valueToMatch)
}
return strings.HasPrefix(valueToMatch, variableValue)
}
// Replace converts "{variable name}" to the value that belongs to the audience and platform.
@ -110,6 +102,17 @@ func (ve *VariableExpander) Expand(valueToExpand string) string {
expanded = strings.Replace(expanded, placeholder, varvalue, -1)
}
// Go through the two-way variables for the target platform.
isPathValue := false
for varname, varvalue := range ve.targetTwoWayVars {
placeholder := fmt.Sprintf("{%s}", varname)
expanded = strings.Replace(expanded, placeholder, varvalue, -1)
// Since two-way variables are meant for path replacement, we know this
// should be a path.
isPathValue = true
}
// Go through the two-way variables, to make sure that the result of
// expanding variables gets the two-way variables applied as well. This is
// necessary to make implicitly-defined variable, which are only defined for
@ -117,7 +120,6 @@ func (ve *VariableExpander) Expand(valueToExpand string) string {
//
// Practically, this replaces "value for the Manager platform" with "value
// for the target platform".
isPathValue := false
for varname, managerValue := range ve.managerTwoWayVars {
targetValue, ok := ve.targetTwoWayVars[varname]
if !ok {
@ -137,6 +139,11 @@ func (ve *VariableExpander) Expand(valueToExpand string) string {
expanded = crosspath.ToPlatform(expanded, string(ve.targetPlatform))
}
log.Trace().
Str("valueToExpand", valueToExpand).
Str("expanded", expanded).
Bool("isPathValue", isPathValue).
Msg("expanded variable")
return expanded
}

View File

@ -6,7 +6,7 @@ import (
"github.com/stretchr/testify/assert"
)
func TestReplaceTwowayVariables(t *testing.T) {
func TestReplaceTwowayVariablesMixedSlashes(t *testing.T) {
c := DefaultConfig(func(c *Conf) {
c.Variables["shared"] = Variable{
IsTwoWay: true,
@ -17,10 +17,36 @@ func TestReplaceTwowayVariables(t *testing.T) {
}
})
replacer := c.NewVariableToValueConverter(VariableAudienceUsers, VariablePlatformWindows)
replacerWin := c.NewVariableToValueConverter(VariableAudienceWorkers, VariablePlatformWindows)
replacerLnx := c.NewVariableToValueConverter(VariableAudienceWorkers, VariablePlatformLinux)
// This is the real reason for this test: forward slashes in the path should
// still be matched to the backslashes in the variable value.
assert.Equal(t, `{shared}\shot\file.blend`, replacer.Replace(`Y:\shared\flamenco\shot\file.blend`))
assert.Equal(t, `{shared}/shot/file.blend`, replacer.Replace(`Y:/shared/flamenco/shot/file.blend`))
assert.Equal(t, `{shared}\shot\file.blend`, replacerWin.Replace(`Y:\shared\flamenco\shot\file.blend`))
assert.Equal(t, `{shared}/shot/file.blend`, replacerWin.Replace(`Y:/shared/flamenco/shot/file.blend`))
assert.Equal(t, `{shared}\shot\file.blend`, replacerLnx.Replace(`/shared\flamenco\shot\file.blend`))
assert.Equal(t, `{shared}/shot/file.blend`, replacerLnx.Replace(`/shared/flamenco/shot/file.blend`))
}
func TestExpandTwowayVariablesMixedSlashes(t *testing.T) {
c := DefaultConfig(func(c *Conf) {
c.Variables["shared"] = Variable{
IsTwoWay: true,
Values: []VariableValue{
{Value: "/shared/flamenco", Platform: VariablePlatformLinux},
{Value: `Y:\shared\flamenco`, Platform: VariablePlatformWindows},
},
}
})
expanderWin := c.NewVariableExpander(VariableAudienceWorkers, VariablePlatformWindows)
expanderLnx := c.NewVariableExpander(VariableAudienceWorkers, VariablePlatformLinux)
// Slashes should always be normalised for the target platform, on the entire path, not just the replaced part.
assert.Equal(t, `Y:\shared\flamenco\shot\file.blend`, expanderWin.Expand(`{shared}\shot\file.blend`))
assert.Equal(t, `Y:\shared\flamenco\shot\file.blend`, expanderWin.Expand(`{shared}/shot/file.blend`))
assert.Equal(t, `/shared/flamenco/shot/file.blend`, expanderLnx.Expand(`{shared}\shot\file.blend`))
assert.Equal(t, `/shared/flamenco/shot/file.blend`, expanderLnx.Expand(`{shared}/shot/file.blend`))
}

View File

@ -189,6 +189,41 @@ func (db *DB) queries() (*sqlc.Queries, error) {
return sqlc.New(&loggingWrapper), nil
}
type queriesTX struct {
queries *sqlc.Queries
commit func() error
rollback func() error
}
// queries returns the SQLC Queries struct, connected to this database.
// It is intended that all GORM queries will be migrated to use this interface
// instead.
//
// After calling this function, all queries should use this transaction until it
// is closed (either committed or rolled back). Otherwise SQLite will deadlock,
// as it will make any other query wait until this transaction is done.
func (db *DB) queriesWithTX() (*queriesTX, error) {
sqldb, err := db.gormDB.DB()
if err != nil {
return nil, fmt.Errorf("could not get low-level database driver: %w", err)
}
tx, err := sqldb.Begin()
if err != nil {
return nil, fmt.Errorf("could not begin database transaction: %w", err)
}
loggingWrapper := LoggingDBConn{tx}
qtx := queriesTX{
queries: sqlc.New(&loggingWrapper),
commit: tx.Commit,
rollback: tx.Rollback,
}
return &qtx, nil
}
// now returns the result of `nowFunc()` wrapped in a sql.NullTime.
func (db *DB) now() sql.NullTime {
return sql.NullTime{

View File

@ -147,37 +147,66 @@ type TaskFailure struct {
// StoreJob stores an AuthoredJob and its tasks, and saves it to the database.
// The job will be in 'under construction' status. It is up to the caller to transition it to its desired initial status.
func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error {
return db.gormDB.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// TODO: separate conversion of struct types from storing things in the database.
dbJob := Job{
UUID: authoredJob.JobID,
Name: authoredJob.Name,
JobType: authoredJob.JobType,
Status: authoredJob.Status,
Priority: authoredJob.Priority,
Settings: StringInterfaceMap(authoredJob.Settings),
Metadata: StringStringMap(authoredJob.Metadata),
Storage: JobStorageInfo{
ShamanCheckoutID: authoredJob.Storage.ShamanCheckoutID,
},
}
// Find and assign the worker tag.
if authoredJob.WorkerTagUUID != "" {
dbTag, err := fetchWorkerTag(tx, authoredJob.WorkerTagUUID)
// Run all queries in a single transaction.
qtx, err := db.queriesWithTX()
if err != nil {
return err
}
dbJob.WorkerTagID = &dbTag.ID
dbJob.WorkerTag = dbTag
defer qtx.rollback()
// Serialise the embedded JSON.
settings, err := json.Marshal(authoredJob.Settings)
if err != nil {
return fmt.Errorf("converting job settings to JSON: %w", err)
}
metadata, err := json.Marshal(authoredJob.Metadata)
if err != nil {
return fmt.Errorf("converting job metadata to JSON: %w", err)
}
if err := tx.Create(&dbJob).Error; err != nil {
// Create the job itself.
params := sqlc.CreateJobParams{
CreatedAt: db.gormDB.NowFunc(),
UUID: authoredJob.JobID,
Name: authoredJob.Name,
JobType: authoredJob.JobType,
Priority: int64(authoredJob.Priority),
Status: string(authoredJob.Status),
Settings: settings,
Metadata: metadata,
StorageShamanCheckoutID: authoredJob.Storage.ShamanCheckoutID,
}
if authoredJob.WorkerTagUUID != "" {
dbTag, err := qtx.queries.FetchWorkerTagByUUID(ctx, authoredJob.WorkerTagUUID)
switch {
case errors.Is(err, sql.ErrNoRows):
return fmt.Errorf("no worker tag %q found", authoredJob.WorkerTagUUID)
case err != nil:
return fmt.Errorf("could not find worker tag %q: %w", authoredJob.WorkerTagUUID, err)
}
params.WorkerTagID = sql.NullInt64{Int64: dbTag.WorkerTag.ID, Valid: true}
}
log.Debug().
Str("job", params.UUID).
Str("type", params.JobType).
Str("name", params.Name).
Str("status", params.Status).
Msg("persistence: storing authored job")
jobID, err := qtx.queries.CreateJob(ctx, params)
if err != nil {
return jobError(err, "storing job")
}
return db.storeAuthoredJobTaks(ctx, tx, &dbJob, &authoredJob)
})
err = db.storeAuthoredJobTaks(ctx, qtx, jobID, &authoredJob)
if err != nil {
return err
}
return qtx.commit()
}
// StoreAuthoredJobTaks is a low-level function that is only used for recreating an existing job's tasks.
@ -187,19 +216,41 @@ func (db *DB) StoreAuthoredJobTaks(
job *Job,
authoredJob *job_compilers.AuthoredJob,
) error {
tx := db.gormDB.WithContext(ctx)
return db.storeAuthoredJobTaks(ctx, tx, job, authoredJob)
qtx, err := db.queriesWithTX()
if err != nil {
return err
}
defer qtx.rollback()
err = db.storeAuthoredJobTaks(ctx, qtx, int64(job.ID), authoredJob)
if err != nil {
return err
}
return qtx.commit()
}
// storeAuthoredJobTaks stores the tasks of the authored job.
// Note that this function does NOT commit the database transaction. That is up
// to the caller.
func (db *DB) storeAuthoredJobTaks(
ctx context.Context,
tx *gorm.DB,
dbJob *Job,
qtx *queriesTX,
jobID int64,
authoredJob *job_compilers.AuthoredJob,
) error {
type TaskInfo struct {
ID int64
UUID string
Name string
}
uuidToTask := make(map[string]*Task)
// Give every task the same creation timestamp.
now := db.gormDB.NowFunc()
uuidToTask := make(map[string]TaskInfo)
for _, authoredTask := range authoredJob.Tasks {
// Marshal commands to JSON.
var commands []Command
for _, authoredCommand := range authoredTask.Commands {
commands = append(commands, Command{
@ -207,22 +258,41 @@ func (db *DB) storeAuthoredJobTaks(
Parameters: StringInterfaceMap(authoredCommand.Parameters),
})
}
commandsJSON, err := json.Marshal(commands)
if err != nil {
return fmt.Errorf("could not convert commands of task %q to JSON: %w",
authoredTask.Name, err)
}
dbTask := Task{
taskParams := sqlc.CreateTaskParams{
CreatedAt: now,
Name: authoredTask.Name,
Type: authoredTask.Type,
UUID: authoredTask.UUID,
Job: dbJob,
Priority: authoredTask.Priority,
Status: api.TaskStatusQueued,
Commands: commands,
JobID: jobID,
Priority: int64(authoredTask.Priority),
Status: string(api.TaskStatusQueued),
Commands: commandsJSON,
// dependencies are stored below.
}
if err := tx.Create(&dbTask).Error; err != nil {
log.Debug().
Str("task", taskParams.UUID).
Str("type", taskParams.Type).
Str("name", taskParams.Name).
Str("status", string(taskParams.Status)).
Msg("persistence: storing authored task")
taskID, err := qtx.queries.CreateTask(ctx, taskParams)
if err != nil {
return taskError(err, "storing task: %v", err)
}
uuidToTask[authoredTask.UUID] = &dbTask
uuidToTask[authoredTask.UUID] = TaskInfo{
ID: taskID,
UUID: taskParams.UUID,
Name: taskParams.Name,
}
}
// Store the dependencies between tasks.
@ -231,32 +301,39 @@ func (db *DB) storeAuthoredJobTaks(
continue
}
dbTask, ok := uuidToTask[authoredTask.UUID]
taskInfo, ok := uuidToTask[authoredTask.UUID]
if !ok {
return taskError(nil, "unable to find task %q in the database, even though it was just authored", authoredTask.UUID)
}
deps := make([]*Task, len(authoredTask.Dependencies))
for i, t := range authoredTask.Dependencies {
depTask, ok := uuidToTask[t.UUID]
deps := make([]*TaskInfo, len(authoredTask.Dependencies))
for idx, authoredDep := range authoredTask.Dependencies {
depTask, ok := uuidToTask[authoredDep.UUID]
if !ok {
return taskError(nil, "finding task with UUID %q; a task depends on a task that is not part of this job", t.UUID)
return taskError(nil, "finding task with UUID %q; a task depends on a task that is not part of this job", authoredDep.UUID)
}
deps[i] = depTask
err := qtx.queries.StoreTaskDependency(ctx, sqlc.StoreTaskDependencyParams{
TaskID: taskInfo.ID,
DependencyID: depTask.ID,
})
if err != nil {
return taskError(err, "error storing task %q depending on task %q", authoredTask.UUID, depTask.UUID)
}
dependenciesbatchsize := 1000
for j := 0; j < len(deps); j += dependenciesbatchsize {
end := j + dependenciesbatchsize
if end > len(deps) {
end = len(deps)
deps[idx] = &depTask
}
currentDeps := deps[j:end]
dbTask.Dependencies = currentDeps
tx.Model(&dbTask).Where("UUID = ?", dbTask.UUID)
subQuery := tx.Model(dbTask).Updates(Task{Dependencies: currentDeps})
if subQuery.Error != nil {
return taskError(subQuery.Error, "error with storing dependencies of task %q issue exists in dependencies %d to %d", authoredTask.UUID, j, end)
if log.Debug().Enabled() {
depNames := make([]string, len(deps))
for i, dep := range deps {
depNames[i] = dep.Name
}
log.Debug().
Str("task", taskInfo.UUID).
Str("name", taskInfo.Name).
Strs("dependencies", depNames).
Msg("persistence: storing authored task dependencies")
}
}
@ -278,7 +355,11 @@ func (db *DB) FetchJob(ctx context.Context, jobUUID string) (*Job, error) {
return nil, jobError(err, "fetching job")
}
return convertSqlcJob(sqlcJob)
gormJob, err := convertSqlcJob(sqlcJob)
if err != nil {
return nil, err
}
return &gormJob, nil
}
// FetchJobShamanCheckoutID fetches the job's Shaman Checkout ID.
@ -414,7 +495,7 @@ func (db *DB) FetchJobsInStatus(ctx context.Context, jobStatuses ...api.JobStatu
if err != nil {
return nil, jobError(err, "converting fetched jobs in status %q", jobStatuses)
}
jobs = append(jobs, job)
jobs = append(jobs, &job)
}
return jobs, nil
@ -493,39 +574,60 @@ func (db *DB) FetchTask(ctx context.Context, taskUUID string) (*Task, error) {
return nil, taskError(err, "fetching task %s", taskUUID)
}
convertedTask, err := convertSqlcTask(taskRow.Task, taskRow.JobUUID.String, taskRow.WorkerUUID.String)
return convertSqlTaskWithJobAndWorker(ctx, queries, taskRow.Task)
}
// TODO: remove this code, and let the code that calls into the persistence
// service fetch the job/worker explicitly when needed.
func convertSqlTaskWithJobAndWorker(
ctx context.Context,
queries *sqlc.Queries,
task sqlc.Task,
) (*Task, error) {
var (
gormJob Job
gormWorker Worker
)
// Fetch & convert the Job.
if task.JobID > 0 {
sqlcJob, err := queries.FetchJobByID(ctx, task.JobID)
if err != nil {
return nil, jobError(err, "fetching job of task %s", task.UUID)
}
gormJob, err = convertSqlcJob(sqlcJob)
if err != nil {
return nil, jobError(err, "converting job of task %s", task.UUID)
}
}
// Fetch & convert the Worker.
if task.WorkerID.Valid && task.WorkerID.Int64 > 0 {
sqlcWorker, err := queries.FetchWorkerUnconditionalByID(ctx, task.WorkerID.Int64)
if err != nil {
return nil, taskError(err, "fetching worker assigned to task %s", task.UUID)
}
gormWorker = convertSqlcWorker(sqlcWorker)
}
// Convert the Task.
gormTask, err := convertSqlcTask(task, gormJob.UUID, gormWorker.UUID)
if err != nil {
return nil, err
}
// TODO: remove this code, and let the caller fetch the job explicitly when needed.
if taskRow.Task.JobID > 0 {
dbJob, err := queries.FetchJobByID(ctx, taskRow.Task.JobID)
if err != nil {
return nil, jobError(err, "fetching job of task %s", taskUUID)
// Put the Job & Worker into the Task.
if gormJob.ID > 0 {
gormTask.Job = &gormJob
gormTask.JobUUID = gormJob.UUID
}
if gormWorker.ID > 0 {
gormTask.Worker = &gormWorker
gormTask.WorkerUUID = gormWorker.UUID
}
convertedJob, err := convertSqlcJob(dbJob)
if err != nil {
return nil, jobError(err, "converting job of task %s", taskUUID)
}
convertedTask.Job = convertedJob
if convertedTask.JobUUID != convertedJob.UUID {
panic("Conversion to SQLC is incomplete")
}
}
// TODO: remove this code, and let the caller fetch the Worker explicitly when needed.
if taskRow.WorkerUUID.Valid {
worker, err := queries.FetchWorkerUnconditional(ctx, taskRow.WorkerUUID.String)
if err != nil {
return nil, taskError(err, "fetching worker assigned to task %s", taskUUID)
}
convertedWorker := convertSqlcWorker(worker)
convertedTask.Worker = &convertedWorker
}
return convertedTask, nil
return gormTask, nil
}
// FetchTaskJobUUID fetches the job UUID of the given task.
@ -1002,7 +1104,7 @@ func (db *DB) FetchTaskFailureList(ctx context.Context, t *Task) ([]*Worker, err
// expected by the rest of the code. This is mostly in place to aid in the GORM
// to SQLC migration. It is intended that eventually the rest of the code will
// use the same SQLC-generated model.
func convertSqlcJob(job sqlc.Job) (*Job, error) {
func convertSqlcJob(job sqlc.Job) (Job, error) {
dbJob := Job{
Model: Model{
ID: uint(job.ID),
@ -1022,11 +1124,11 @@ func convertSqlcJob(job sqlc.Job) (*Job, error) {
}
if err := json.Unmarshal(job.Settings, &dbJob.Settings); err != nil {
return nil, jobError(err, fmt.Sprintf("job %s has invalid settings: %v", job.UUID, err))
return Job{}, jobError(err, fmt.Sprintf("job %s has invalid settings: %v", job.UUID, err))
}
if err := json.Unmarshal(job.Metadata, &dbJob.Metadata); err != nil {
return nil, jobError(err, fmt.Sprintf("job %s has invalid metadata: %v", job.UUID, err))
return Job{}, jobError(err, fmt.Sprintf("job %s has invalid metadata: %v", job.UUID, err))
}
if job.WorkerTagID.Valid {
@ -1034,7 +1136,7 @@ func convertSqlcJob(job sqlc.Job) (*Job, error) {
dbJob.WorkerTagID = &workerTagID
}
return &dbJob, nil
return dbJob, nil
}
// convertSqlcTask converts a FetchTaskRow from the SQLC-generated model to the
@ -1085,3 +1187,12 @@ func convertTaskStatuses(taskStatuses []api.TaskStatus) []string {
}
return statusesAsStrings
}
// convertJobStatuses converts from []api.JobStatus to []string for feeding to sqlc.
func convertJobStatuses(jobStatuses []api.JobStatus) []string {
statusesAsStrings := make([]string, len(jobStatuses))
for index := range jobStatuses {
statusesAsStrings[index] = string(jobStatuses[index])
}
return statusesAsStrings
}

View File

@ -1,7 +1,8 @@
-- name: CreateJob :exec
-- name: CreateJob :execlastid
INSERT INTO jobs (
created_at,
updated_at,
uuid,
name,
job_type,
@ -10,9 +11,49 @@ INSERT INTO jobs (
activity,
settings,
metadata,
storage_shaman_checkout_id
storage_shaman_checkout_id,
worker_tag_id
)
VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ? );
VALUES (
@created_at,
@created_at,
@uuid,
@name,
@job_type,
@priority,
@status,
@activity,
@settings,
@metadata,
@storage_shaman_checkout_id,
@worker_tag_id
);
-- name: CreateTask :execlastid
INSERT INTO tasks (
created_at,
updated_at,
uuid,
name,
type,
job_id,
priority,
status,
commands
) VALUES (
@created_at,
@created_at,
@uuid,
@name,
@type,
@job_id,
@priority,
@status,
@commands
);
-- name: StoreTaskDependency :exec
INSERT INTO task_dependencies (task_id, dependency_id) VALUES (@task_id, @dependency_id);
-- name: FetchJob :one
-- Fetch a job by its UUID.

View File

@ -63,9 +63,10 @@ func (q *Queries) CountWorkersFailingTask(ctx context.Context, taskID int64) (in
return num_failed, err
}
const createJob = `-- name: CreateJob :exec
const createJob = `-- name: CreateJob :execlastid
INSERT INTO jobs (
created_at,
updated_at,
uuid,
name,
job_type,
@ -74,9 +75,23 @@ INSERT INTO jobs (
activity,
settings,
metadata,
storage_shaman_checkout_id
storage_shaman_checkout_id,
worker_tag_id
)
VALUES (
?1,
?1,
?2,
?3,
?4,
?5,
?6,
?7,
?8,
?9,
?10,
?11
)
VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )
`
type CreateJobParams struct {
@ -90,10 +105,11 @@ type CreateJobParams struct {
Settings json.RawMessage
Metadata json.RawMessage
StorageShamanCheckoutID string
WorkerTagID sql.NullInt64
}
func (q *Queries) CreateJob(ctx context.Context, arg CreateJobParams) error {
_, err := q.db.ExecContext(ctx, createJob,
func (q *Queries) CreateJob(ctx context.Context, arg CreateJobParams) (int64, error) {
result, err := q.db.ExecContext(ctx, createJob,
arg.CreatedAt,
arg.UUID,
arg.Name,
@ -104,8 +120,64 @@ func (q *Queries) CreateJob(ctx context.Context, arg CreateJobParams) error {
arg.Settings,
arg.Metadata,
arg.StorageShamanCheckoutID,
arg.WorkerTagID,
)
return err
if err != nil {
return 0, err
}
return result.LastInsertId()
}
const createTask = `-- name: CreateTask :execlastid
INSERT INTO tasks (
created_at,
updated_at,
uuid,
name,
type,
job_id,
priority,
status,
commands
) VALUES (
?1,
?1,
?2,
?3,
?4,
?5,
?6,
?7,
?8
)
`
type CreateTaskParams struct {
CreatedAt time.Time
UUID string
Name string
Type string
JobID int64
Priority int64
Status string
Commands json.RawMessage
}
func (q *Queries) CreateTask(ctx context.Context, arg CreateTaskParams) (int64, error) {
result, err := q.db.ExecContext(ctx, createTask,
arg.CreatedAt,
arg.UUID,
arg.Name,
arg.Type,
arg.JobID,
arg.Priority,
arg.Status,
arg.Commands,
)
if err != nil {
return 0, err
}
return result.LastInsertId()
}
const deleteJob = `-- name: DeleteJob :exec
@ -805,6 +877,20 @@ func (q *Queries) SetLastRendered(ctx context.Context, arg SetLastRenderedParams
return err
}
const storeTaskDependency = `-- name: StoreTaskDependency :exec
INSERT INTO task_dependencies (task_id, dependency_id) VALUES (?1, ?2)
`
type StoreTaskDependencyParams struct {
TaskID int64
DependencyID int64
}
func (q *Queries) StoreTaskDependency(ctx context.Context, arg StoreTaskDependencyParams) error {
_, err := q.db.ExecContext(ctx, storeTaskDependency, arg.TaskID, arg.DependencyID)
return err
}
const taskAssignToWorker = `-- name: TaskAssignToWorker :exec
UPDATE tasks SET
updated_at = ?1,

View File

@ -0,0 +1,53 @@
-- name: FetchAssignedAndRunnableTaskOfWorker :one
-- Fetch a task that's assigned to this worker, and is in a runnable state.
SELECT sqlc.embed(tasks)
FROM tasks
INNER JOIN jobs ON tasks.job_id = jobs.id
WHERE tasks.status=@active_task_status
AND tasks.worker_id=@worker_id
AND jobs.status IN (sqlc.slice('active_job_statuses'))
LIMIT 1;
-- name: FindRunnableTask :one
-- Find a task to be run by a worker. This is the core of the task scheduler.
--
-- Note that this query doesn't check for the assigned worker. Tasks that have a
-- 'schedulable' status might have been assigned to a worker, representing the
-- last worker to touch it -- it's not meant to indicate "ownership" of the
-- task.
--
-- The order in the WHERE clause is important, slices should come last. See
-- https://github.com/sqlc-dev/sqlc/issues/2452 for more info.
SELECT sqlc.embed(tasks)
FROM tasks
INNER JOIN jobs ON tasks.job_id = jobs.id
LEFT JOIN task_failures TF ON tasks.id = TF.task_id AND TF.worker_id=@worker_id
WHERE TF.worker_id IS NULL -- Not failed by this worker before.
AND tasks.id NOT IN (
-- Find all tasks IDs that have incomplete dependencies. These are not runnable.
SELECT tasks_incomplete.id
FROM tasks AS tasks_incomplete
INNER JOIN task_dependencies td ON tasks_incomplete.id = td.task_id
INNER JOIN tasks dep ON dep.id = td.dependency_id
WHERE dep.status != @task_status_completed
)
AND tasks.type NOT IN (
SELECT task_type
FROM job_blocks
WHERE job_blocks.worker_id = @worker_id
AND job_blocks.job_id = jobs.id
)
AND (
jobs.worker_tag_id IS NULL
OR jobs.worker_tag_id IN (sqlc.slice('worker_tags')))
AND tasks.status IN (sqlc.slice('schedulable_task_statuses'))
AND jobs.status IN (sqlc.slice('schedulable_job_statuses'))
AND tasks.type IN (sqlc.slice('supported_task_types'))
ORDER BY jobs.priority DESC, tasks.priority DESC;
-- name: AssignTaskToWorker :exec
UPDATE tasks
SET worker_id=@worker_id, last_touched_at=@now, updated_at=@now
WHERE tasks.id=@task_id;

View File

@ -0,0 +1,191 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.26.0
// source: query_task_scheduler.sql
package sqlc
import (
"context"
"database/sql"
"strings"
)
const assignTaskToWorker = `-- name: AssignTaskToWorker :exec
UPDATE tasks
SET worker_id=?1, last_touched_at=?2, updated_at=?2
WHERE tasks.id=?3
`
type AssignTaskToWorkerParams struct {
WorkerID sql.NullInt64
Now sql.NullTime
TaskID int64
}
func (q *Queries) AssignTaskToWorker(ctx context.Context, arg AssignTaskToWorkerParams) error {
_, err := q.db.ExecContext(ctx, assignTaskToWorker, arg.WorkerID, arg.Now, arg.TaskID)
return err
}
const fetchAssignedAndRunnableTaskOfWorker = `-- name: FetchAssignedAndRunnableTaskOfWorker :one
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity
FROM tasks
INNER JOIN jobs ON tasks.job_id = jobs.id
WHERE tasks.status=?1
AND tasks.worker_id=?2
AND jobs.status IN (/*SLICE:active_job_statuses*/?)
LIMIT 1
`
type FetchAssignedAndRunnableTaskOfWorkerParams struct {
ActiveTaskStatus string
WorkerID sql.NullInt64
ActiveJobStatuses []string
}
type FetchAssignedAndRunnableTaskOfWorkerRow struct {
Task Task
}
// Fetch a task that's assigned to this worker, and is in a runnable state.
func (q *Queries) FetchAssignedAndRunnableTaskOfWorker(ctx context.Context, arg FetchAssignedAndRunnableTaskOfWorkerParams) (FetchAssignedAndRunnableTaskOfWorkerRow, error) {
query := fetchAssignedAndRunnableTaskOfWorker
var queryParams []interface{}
queryParams = append(queryParams, arg.ActiveTaskStatus)
queryParams = append(queryParams, arg.WorkerID)
if len(arg.ActiveJobStatuses) > 0 {
for _, v := range arg.ActiveJobStatuses {
queryParams = append(queryParams, v)
}
query = strings.Replace(query, "/*SLICE:active_job_statuses*/?", strings.Repeat(",?", len(arg.ActiveJobStatuses))[1:], 1)
} else {
query = strings.Replace(query, "/*SLICE:active_job_statuses*/?", "NULL", 1)
}
row := q.db.QueryRowContext(ctx, query, queryParams...)
var i FetchAssignedAndRunnableTaskOfWorkerRow
err := row.Scan(
&i.Task.ID,
&i.Task.CreatedAt,
&i.Task.UpdatedAt,
&i.Task.UUID,
&i.Task.Name,
&i.Task.Type,
&i.Task.JobID,
&i.Task.Priority,
&i.Task.Status,
&i.Task.WorkerID,
&i.Task.LastTouchedAt,
&i.Task.Commands,
&i.Task.Activity,
)
return i, err
}
const findRunnableTask = `-- name: FindRunnableTask :one
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity
FROM tasks
INNER JOIN jobs ON tasks.job_id = jobs.id
LEFT JOIN task_failures TF ON tasks.id = TF.task_id AND TF.worker_id=?1
WHERE TF.worker_id IS NULL -- Not failed by this worker before.
AND tasks.id NOT IN (
-- Find all tasks IDs that have incomplete dependencies. These are not runnable.
SELECT tasks_incomplete.id
FROM tasks AS tasks_incomplete
INNER JOIN task_dependencies td ON tasks_incomplete.id = td.task_id
INNER JOIN tasks dep ON dep.id = td.dependency_id
WHERE dep.status != ?2
)
AND tasks.type NOT IN (
SELECT task_type
FROM job_blocks
WHERE job_blocks.worker_id = ?1
AND job_blocks.job_id = jobs.id
)
AND (
jobs.worker_tag_id IS NULL
OR jobs.worker_tag_id IN (/*SLICE:worker_tags*/?))
AND tasks.status IN (/*SLICE:schedulable_task_statuses*/?)
AND jobs.status IN (/*SLICE:schedulable_job_statuses*/?)
AND tasks.type IN (/*SLICE:supported_task_types*/?)
ORDER BY jobs.priority DESC, tasks.priority DESC
`
type FindRunnableTaskParams struct {
WorkerID int64
TaskStatusCompleted string
WorkerTags []sql.NullInt64
SchedulableTaskStatuses []string
SchedulableJobStatuses []string
SupportedTaskTypes []string
}
type FindRunnableTaskRow struct {
Task Task
}
// Find a task to be run by a worker. This is the core of the task scheduler.
//
// Note that this query doesn't check for the assigned worker. Tasks that have a
// 'schedulable' status might have been assigned to a worker, representing the
// last worker to touch it -- it's not meant to indicate "ownership" of the
// task.
//
// The order in the WHERE clause is important, slices should come last. See
// https://github.com/sqlc-dev/sqlc/issues/2452 for more info.
func (q *Queries) FindRunnableTask(ctx context.Context, arg FindRunnableTaskParams) (FindRunnableTaskRow, error) {
query := findRunnableTask
var queryParams []interface{}
queryParams = append(queryParams, arg.WorkerID)
queryParams = append(queryParams, arg.TaskStatusCompleted)
if len(arg.WorkerTags) > 0 {
for _, v := range arg.WorkerTags {
queryParams = append(queryParams, v)
}
query = strings.Replace(query, "/*SLICE:worker_tags*/?", strings.Repeat(",?", len(arg.WorkerTags))[1:], 1)
} else {
query = strings.Replace(query, "/*SLICE:worker_tags*/?", "NULL", 1)
}
if len(arg.SchedulableTaskStatuses) > 0 {
for _, v := range arg.SchedulableTaskStatuses {
queryParams = append(queryParams, v)
}
query = strings.Replace(query, "/*SLICE:schedulable_task_statuses*/?", strings.Repeat(",?", len(arg.SchedulableTaskStatuses))[1:], 1)
} else {
query = strings.Replace(query, "/*SLICE:schedulable_task_statuses*/?", "NULL", 1)
}
if len(arg.SchedulableJobStatuses) > 0 {
for _, v := range arg.SchedulableJobStatuses {
queryParams = append(queryParams, v)
}
query = strings.Replace(query, "/*SLICE:schedulable_job_statuses*/?", strings.Repeat(",?", len(arg.SchedulableJobStatuses))[1:], 1)
} else {
query = strings.Replace(query, "/*SLICE:schedulable_job_statuses*/?", "NULL", 1)
}
if len(arg.SupportedTaskTypes) > 0 {
for _, v := range arg.SupportedTaskTypes {
queryParams = append(queryParams, v)
}
query = strings.Replace(query, "/*SLICE:supported_task_types*/?", strings.Repeat(",?", len(arg.SupportedTaskTypes))[1:], 1)
} else {
query = strings.Replace(query, "/*SLICE:supported_task_types*/?", "NULL", 1)
}
row := q.db.QueryRowContext(ctx, query, queryParams...)
var i FindRunnableTaskRow
err := row.Scan(
&i.Task.ID,
&i.Task.CreatedAt,
&i.Task.UpdatedAt,
&i.Task.UUID,
&i.Task.Name,
&i.Task.Type,
&i.Task.JobID,
&i.Task.Priority,
&i.Task.Status,
&i.Task.WorkerID,
&i.Task.LastTouchedAt,
&i.Task.Commands,
&i.Task.Activity,
)
return i, err
}

View File

@ -49,6 +49,10 @@ SELECT * FROM workers WHERE workers.uuid = @uuid and deleted_at is NULL;
-- FetchWorkerUnconditional ignores soft-deletion status and just returns the worker.
SELECT * FROM workers WHERE workers.uuid = @uuid;
-- name: FetchWorkerUnconditionalByID :one
-- FetchWorkerUnconditional ignores soft-deletion status and just returns the worker.
SELECT * FROM workers WHERE workers.id = @worker_id;
-- name: FetchWorkerTags :many
SELECT worker_tags.*
FROM worker_tags
@ -56,6 +60,11 @@ LEFT JOIN worker_tag_membership m ON (m.worker_tag_id = worker_tags.id)
LEFT JOIN workers on (m.worker_id = workers.id)
WHERE workers.uuid = @uuid;
-- name: FetchWorkerTagByUUID :one
SELECT sqlc.embed(worker_tags)
FROM worker_tags
WHERE worker_tags.uuid = @uuid;
-- name: SoftDeleteWorker :execrows
UPDATE workers SET deleted_at=@deleted_at
WHERE uuid=@uuid;

View File

@ -129,6 +129,30 @@ func (q *Queries) FetchWorker(ctx context.Context, uuid string) (Worker, error)
return i, err
}
const fetchWorkerTagByUUID = `-- name: FetchWorkerTagByUUID :one
SELECT worker_tags.id, worker_tags.created_at, worker_tags.updated_at, worker_tags.uuid, worker_tags.name, worker_tags.description
FROM worker_tags
WHERE worker_tags.uuid = ?1
`
type FetchWorkerTagByUUIDRow struct {
WorkerTag WorkerTag
}
func (q *Queries) FetchWorkerTagByUUID(ctx context.Context, uuid string) (FetchWorkerTagByUUIDRow, error) {
row := q.db.QueryRowContext(ctx, fetchWorkerTagByUUID, uuid)
var i FetchWorkerTagByUUIDRow
err := row.Scan(
&i.WorkerTag.ID,
&i.WorkerTag.CreatedAt,
&i.WorkerTag.UpdatedAt,
&i.WorkerTag.UUID,
&i.WorkerTag.Name,
&i.WorkerTag.Description,
)
return i, err
}
const fetchWorkerTags = `-- name: FetchWorkerTags :many
SELECT worker_tags.id, worker_tags.created_at, worker_tags.updated_at, worker_tags.uuid, worker_tags.name, worker_tags.description
FROM worker_tags
@ -196,6 +220,35 @@ func (q *Queries) FetchWorkerUnconditional(ctx context.Context, uuid string) (Wo
return i, err
}
const fetchWorkerUnconditionalByID = `-- name: FetchWorkerUnconditionalByID :one
SELECT id, created_at, updated_at, uuid, secret, name, address, platform, software, status, last_seen_at, status_requested, lazy_status_request, supported_task_types, deleted_at, can_restart FROM workers WHERE workers.id = ?1
`
// FetchWorkerUnconditional ignores soft-deletion status and just returns the worker.
func (q *Queries) FetchWorkerUnconditionalByID(ctx context.Context, workerID int64) (Worker, error) {
row := q.db.QueryRowContext(ctx, fetchWorkerUnconditionalByID, workerID)
var i Worker
err := row.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.UUID,
&i.Secret,
&i.Name,
&i.Address,
&i.Platform,
&i.Software,
&i.Status,
&i.LastSeenAt,
&i.StatusRequested,
&i.LazyStatusRequest,
&i.SupportedTaskTypes,
&i.DeletedAt,
&i.CanRestart,
)
return i, err
}
const fetchWorkers = `-- name: FetchWorkers :many
SELECT workers.id, workers.created_at, workers.updated_at, workers.uuid, workers.secret, workers.name, workers.address, workers.platform, workers.software, workers.status, workers.last_seen_at, workers.status_requested, workers.lazy_status_request, workers.supported_task_types, workers.deleted_at, workers.can_restart FROM workers
WHERE deleted_at IS NULL

View File

@ -4,11 +4,15 @@ package persistence
import (
"context"
"database/sql"
"errors"
"fmt"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"gorm.io/gorm"
"projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc"
"projects.blender.org/studio/flamenco/pkg/api"
)
@ -26,149 +30,139 @@ func (db *DB) ScheduleTask(ctx context.Context, w *Worker) (*Task, error) {
logger := log.With().Str("worker", w.UUID).Logger()
logger.Trace().Msg("finding task for worker")
hasWorkerTags, err := db.HasWorkerTags(ctx)
// Run all queries in a single transaction.
//
// After this point, all queries should use this transaction. Otherwise SQLite
// will deadlock, as it will make any other query wait until this transaction
// is done.
qtx, err := db.queriesWithTX()
if err != nil {
return nil, err
}
// Run two queries in one transaction:
// 1. find task, and
// 2. assign the task to the worker.
var task *Task
txErr := db.gormDB.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
var err error
task, err = findTaskForWorker(tx, w, hasWorkerTags)
defer qtx.rollback()
task, err := db.scheduleTask(ctx, qtx.queries, w, logger)
if err != nil {
if isDatabaseBusyError(err) {
logger.Trace().Err(err).Msg("database busy while finding task for worker")
return errDatabaseBusy
}
logger.Error().Err(err).Msg("finding task for worker")
return fmt.Errorf("finding task for worker: %w", err)
return nil, err
}
if task == nil {
// No task found, which is fine.
return nil
// No task means no changes to the database.
// It's fine to just roll back the transaction.
return nil, nil
}
// Found a task, now assign it to the requesting worker.
if err := assignTaskToWorker(tx, w, task); err != nil {
if isDatabaseBusyError(err) {
gormTask, err := convertSqlTaskWithJobAndWorker(ctx, qtx.queries, *task)
if err != nil {
return nil, err
}
if err := qtx.commit(); err != nil {
return nil, fmt.Errorf(
"could not commit database transaction after scheduling task %s for worker %s: %w",
task.UUID, w.UUID, err)
}
return gormTask, nil
}
func (db *DB) scheduleTask(ctx context.Context, queries *sqlc.Queries, w *Worker, logger zerolog.Logger) (*sqlc.Task, error) {
if w.ID == 0 {
panic("worker should be in database, but has zero ID")
}
workerID := sql.NullInt64{Int64: int64(w.ID), Valid: true}
// If a task is alreay active & assigned to this worker, return just that.
// Note that this task type could be blocklisted or no longer supported by the
// Worker, but since it's active that is unlikely.
{
row, err := queries.FetchAssignedAndRunnableTaskOfWorker(ctx, sqlc.FetchAssignedAndRunnableTaskOfWorkerParams{
ActiveTaskStatus: string(api.TaskStatusActive),
ActiveJobStatuses: convertJobStatuses(schedulableJobStatuses),
WorkerID: workerID,
})
switch {
case errors.Is(err, sql.ErrNoRows):
// Fine, just means there was no task assigned yet.
case err != nil:
return nil, err
case row.Task.ID > 0:
return &row.Task, nil
}
}
task, err := findTaskForWorker(ctx, queries, w)
switch {
case errors.Is(err, sql.ErrNoRows):
// Fine, just means there was no task assigned yet.
return nil, nil
case isDatabaseBusyError(err):
logger.Trace().Err(err).Msg("database busy while finding task for worker")
return nil, errDatabaseBusy
case err != nil:
logger.Error().Err(err).Msg("finding task for worker")
return nil, fmt.Errorf("finding task for worker: %w", err)
}
// Assign the task to the worker.
err = queries.AssignTaskToWorker(ctx, sqlc.AssignTaskToWorkerParams{
WorkerID: workerID,
Now: db.now(),
TaskID: task.ID,
})
switch {
case isDatabaseBusyError(err):
logger.Trace().Err(err).Msg("database busy while assigning task to worker")
return errDatabaseBusy
}
return nil, errDatabaseBusy
case err != nil:
logger.Warn().
Str("taskID", task.UUID).
Err(err).
Msg("assigning task to worker")
return fmt.Errorf("assigning task to worker: %w", err)
return nil, fmt.Errorf("assigning task to worker: %w", err)
}
return nil
})
if txErr != nil {
return nil, txErr
}
if task == nil {
logger.Debug().Msg("no task for worker")
return nil, nil
}
// Make sure the returned task matches the database.
task.WorkerID = workerID
logger.Info().
Str("taskID", task.UUID).
Msg("assigned task to worker")
return task, nil
}
func findTaskForWorker(tx *gorm.DB, w *Worker, checkWorkerTags bool) (*Task, error) {
task := Task{}
// If a task is alreay active & assigned to this worker, return just that.
// Note that this task type could be blocklisted or no longer supported by the
// Worker, but since it's active that is unlikely.
assignedTaskResult := taskAssignedAndRunnableQuery(tx.Model(&task), w).
Preload("Job").
Find(&task)
if assignedTaskResult.Error != nil {
return nil, assignedTaskResult.Error
}
if assignedTaskResult.RowsAffected > 0 {
return &task, nil
}
// Produce the 'current task ID' by selecting all its incomplete dependencies.
// This can then be used in a subquery to filter out such tasks.
// `tasks.id` is the task ID from the outer query.
incompleteDepsQuery := tx.Table("tasks as tasks2").
Select("tasks2.id").
Joins("left join task_dependencies td on tasks2.id = td.task_id").
Joins("left join tasks dep on dep.id = td.dependency_id").
Where("tasks2.id = tasks.id").
Where("dep.status is not NULL and dep.status != ?", api.TaskStatusCompleted)
func findTaskForWorker(
ctx context.Context,
queries *sqlc.Queries,
w *Worker,
) (sqlc.Task, error) {
blockedTaskTypesQuery := tx.Model(&JobBlock{}).
Select("job_blocks.task_type").
Where("job_blocks.worker_id = ?", w.ID).
Where("job_blocks.job_id = jobs.id")
// Note that this query doesn't check for the assigned worker. Tasks that have
// a 'schedulable' status might have been assigned to a worker, representing
// the last worker to touch it -- it's not meant to indicate "ownership" of
// the task.
findTaskQuery := tx.Model(&task).
Joins("left join jobs on tasks.job_id = jobs.id").
Joins("left join task_failures TF on tasks.id = TF.task_id and TF.worker_id=?", w.ID).
Where("tasks.status in ?", schedulableTaskStatuses). // Schedulable task statuses
Where("jobs.status in ?", schedulableJobStatuses). // Schedulable job statuses
Where("tasks.type in ?", w.TaskTypes()). // Supported task types
Where("tasks.id not in (?)", incompleteDepsQuery). // Dependencies completed
Where("TF.worker_id is NULL"). // Not failed before
Where("tasks.type not in (?)", blockedTaskTypesQuery) // Non-blocklisted
if checkWorkerTags {
// The system has one or more tags, so limit the available jobs to those
// that have no tag, or overlap with the Worker's tags.
if len(w.Tags) == 0 {
// Tagless workers only get tagless jobs.
findTaskQuery = findTaskQuery.
Where("jobs.worker_tag_id is NULL")
} else {
// Taged workers get tagless jobs AND jobs of their own tags.
tagIDs := []uint{}
for _, tag := range w.Tags {
tagIDs = append(tagIDs, tag.ID)
}
findTaskQuery = findTaskQuery.
Where("jobs.worker_tag_id is NULL or worker_tag_id in ?", tagIDs)
}
// Construct the list of worker tags to check.
workerTags := make([]sql.NullInt64, len(w.Tags))
for index, tag := range w.Tags {
workerTags[index] = sql.NullInt64{Int64: int64(tag.ID), Valid: true}
}
findTaskResult := findTaskQuery.
Order("jobs.priority desc"). // Highest job priority
Order("tasks.priority desc"). // Highest task priority
Limit(1).
Preload("Job").
Find(&task)
if findTaskResult.Error != nil {
return nil, findTaskResult.Error
row, err := queries.FindRunnableTask(ctx, sqlc.FindRunnableTaskParams{
WorkerID: int64(w.ID),
SchedulableTaskStatuses: convertTaskStatuses(schedulableTaskStatuses),
SchedulableJobStatuses: convertJobStatuses(schedulableJobStatuses),
SupportedTaskTypes: w.TaskTypes(),
TaskStatusCompleted: string(api.TaskStatusCompleted),
WorkerTags: workerTags,
})
if err != nil {
return sqlc.Task{}, err
}
if task.ID == 0 {
// No task fetched, which doesn't result in an error with Limt(1).Find(&task).
return nil, nil
if row.Task.ID == 0 {
return sqlc.Task{}, nil
}
return &task, nil
}
func assignTaskToWorker(tx *gorm.DB, w *Worker, t *Task) error {
return tx.Model(t).
Select("WorkerID", "LastTouchedAt").
Updates(Task{WorkerID: &w.ID, LastTouchedAt: tx.NowFunc()}).Error
return row.Task, nil
}
// taskAssignedAndRunnableQuery appends some GORM clauses to query for a task

View File

@ -43,25 +43,18 @@ func TestOneJobOneTask(t *testing.T) {
require.NoError(t, err)
// Check the returned task.
if task == nil {
t.Fatal("task is nil")
}
require.NotNil(t, task)
assert.Equal(t, job.ID, task.JobID)
if task.WorkerID == nil {
t.Fatal("no worker assigned to task")
}
require.NotNil(t, task.WorkerID, "no worker assigned to returned task")
assert.Equal(t, w.ID, *task.WorkerID, "task must be assigned to the requesting worker")
// Check the task in the database.
now := db.gormDB.NowFunc()
dbTask, err := db.FetchTask(context.Background(), authTask.UUID)
require.NoError(t, err)
if dbTask == nil {
t.Fatal("task cannot be fetched from database")
}
if dbTask.WorkerID == nil {
t.Fatal("no worker assigned to task")
}
require.NotNil(t, dbTask)
require.NotNil(t, dbTask.WorkerID, "no worker assigned to task in database")
assert.Equal(t, w.ID, *dbTask.WorkerID, "task must be assigned to the requesting worker")
assert.WithinDuration(t, now, dbTask.LastTouchedAt, time.Second, "task must be 'touched' by the worker after scheduling")
}
@ -85,14 +78,10 @@ func TestOneJobThreeTasksByPrio(t *testing.T) {
task, err := db.ScheduleTask(ctx, &w)
require.NoError(t, err)
if task == nil {
t.Fatal("task is nil")
}
require.NotNil(t, task)
assert.Equal(t, job.ID, task.JobID)
if task.Job == nil {
t.Fatal("task.Job is nil")
}
assert.NotNil(t, task.Job)
assert.Equal(t, att2.Name, task.Name, "the high-prio task should have been chosen")
}
@ -116,9 +105,7 @@ func TestOneJobThreeTasksByDependencies(t *testing.T) {
task, err := db.ScheduleTask(ctx, &w)
require.NoError(t, err)
if task == nil {
t.Fatal("task is nil")
}
require.NotNil(t, task)
assert.Equal(t, job.ID, task.JobID)
assert.Equal(t, att1.Name, task.Name, "the first task should have been chosen")
}
@ -156,13 +143,72 @@ func TestTwoJobsThreeTasks(t *testing.T) {
task, err := db.ScheduleTask(ctx, &w)
require.NoError(t, err)
if task == nil {
t.Fatal("task is nil")
}
require.NotNil(t, task)
assert.Equal(t, job2.ID, task.JobID)
assert.Equal(t, att2_3.Name, task.Name, "the 3rd task of the 2nd job should have been chosen")
}
// TestFanOutFanIn tests one starting task, then multiple tasks that depend on
// it that can run in parallel (fan-out), then one task that depends on all the
// parallel tasks (fan-in), and finally one last task that depends on the fan-in
// task.
func TestFanOutFanIn(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(schedulerTestTimeout)
defer cancel()
w := linuxWorker(t, db)
// Single start task.
task1 := authorTestTask("1 start", "blender")
// Fan out.
task2_1 := authorTestTask("2.1 parallel", "blender")
task2_1.Dependencies = []*job_compilers.AuthoredTask{&task1}
task2_2 := authorTestTask("2.2 parallel", "blender")
task2_2.Dependencies = []*job_compilers.AuthoredTask{&task1}
task2_3 := authorTestTask("2.3 parallel", "blender")
task2_3.Dependencies = []*job_compilers.AuthoredTask{&task1}
// Fan in.
task3 := authorTestTask("3 fan-in", "blender")
task3.Dependencies = []*job_compilers.AuthoredTask{&task2_1, &task2_2, &task2_3}
// Final task.
task4 := authorTestTask("4 final", "ffmpeg")
task4.Dependencies = []*job_compilers.AuthoredTask{&task3}
// Construct the job, with the tasks not in execution order, to root out
// potential issues with the dependency resolution.
atj := authorTestJob(
"92e75ecf-7d2a-461c-8443-2fbe6a8b559d",
"fan-out-fan-in",
task4, task3, task2_1, task2_2, task1, task2_3)
require.NotNil(t, constructTestJob(ctx, t, db, atj))
// Check the order in which tasks are handed out.
executionOrder := []string{} // Slice of task names.
for index := range 6 {
task, err := db.ScheduleTask(ctx, &w)
require.NoError(t, err)
require.NotNil(t, task, "task #%d is nil", index)
executionOrder = append(executionOrder, task.Name)
// Fake that the task has been completed by the worker.
task.Status = api.TaskStatusCompleted
require.NoError(t, db.SaveTaskStatus(ctx, task))
}
expectedOrder := []string{
"1 start",
"2.1 parallel",
"2.2 parallel",
"2.3 parallel",
"3 fan-in",
"4 final",
}
assert.Equal(t, expectedOrder, executionOrder)
}
func TestSomeButNotAllDependenciesCompleted(t *testing.T) {
// There was a bug in the task scheduler query, where it would schedule a task
// if any of its dependencies was completed (instead of all dependencies).
@ -218,9 +264,7 @@ func TestAlreadyAssigned(t *testing.T) {
task, err := db.ScheduleTask(ctx, &w)
require.NoError(t, err)
if task == nil {
t.Fatal("task is nil")
}
require.NotNil(t, task)
assert.Equal(t, att3.Name, task.Name, "the already-assigned task should have been chosen")
}
@ -253,9 +297,7 @@ func TestAssignedToOtherWorker(t *testing.T) {
task, err := db.ScheduleTask(ctx, &w)
require.NoError(t, err)
if task == nil {
t.Fatal("task is nil")
}
require.NotNil(t, task)
assert.Equal(t, att2.Name, task.Name, "the high-prio task should have been chosen")
assert.Equal(t, *task.WorkerID, w.ID, "the task should now be assigned to the worker it was scheduled for")
@ -285,9 +327,7 @@ func TestPreviouslyFailed(t *testing.T) {
// This should assign the 2nd task.
task, err := db.ScheduleTask(ctx, &w)
require.NoError(t, err)
if task == nil {
t.Fatal("task is nil")
}
require.NotNil(t, task)
assert.Equal(t, att2.Name, task.Name, "the second task should have been chosen")
}
@ -396,9 +436,7 @@ func TestBlocklisted(t *testing.T) {
// This should assign the 2nd task.
task, err := db.ScheduleTask(ctx, &w)
require.NoError(t, err)
if task == nil {
t.Fatal("task is nil")
}
require.NotNil(t, task)
assert.Equal(t, att2.Name, task.Name, "the second task should have been chosen")
}

View File

@ -37,6 +37,15 @@ func (db *DB) FetchTimedOutTasks(ctx context.Context, untouchedSince time.Time)
if tx.Error != nil {
return nil, taskError(tx.Error, "finding timed out tasks (untouched since %s)", untouchedSince.String())
}
// GORM apparently doesn't call the task's AfterFind() function for the above query.
for _, task := range result {
err := task.AfterFind(tx)
if err != nil {
return nil, taskError(tx.Error, "finding the job & worker UUIDs for task %s", task.UUID)
}
}
return result, nil
}

View File

@ -34,3 +34,20 @@ sql:
jobuuid: "JobUUID"
taskUUID: "TaskUUID"
workeruuid: "WorkerUUID"
- engine: "sqlite"
schema: "internal/manager/persistence/sqlc/schema.sql"
queries: "internal/manager/persistence/sqlc/query_task_scheduler.sql"
gen:
go:
out: "internal/manager/persistence/sqlc"
overrides:
- db_type: "jsonb"
go_type:
import: "encoding/json"
type: "RawMessage"
rename:
uuid: "UUID"
uuids: "UUIDs"
jobuuid: "JobUUID"
taskUUID: "TaskUUID"
workeruuid: "WorkerUUID"

View File

@ -55,7 +55,7 @@ class ApiClient {
* @default {}
*/
this.defaultHeaders = {
'User-Agent': 'Flamenco/3.6-alpha0 / webbrowser'
'User-Agent': 'Flamenco/3.6-alpha3 / webbrowser'
};
/**

View File

@ -1,2 +1,2 @@
latestVersion: "3.5"
latestExperimentalVersion: "3.6-alpha0"
latestExperimentalVersion: "3.6-alpha2"