Rename worker clusters to tags #104223
@ -8,7 +8,7 @@ bugs in actually-released versions.
|
||||
|
||||
- Improve speed of queueing up >100 simultaneous job deletions.
|
||||
- Improve logging of job deletion.
|
||||
- Add Worker Cluster support. Workers can be members of any number of clusters. Workers will only work on jobs that are assigned to that cluster. Jobs that do not have a cluster will be available to all workers, regardless of their cluster assignment. As a result, clusterless workers will only work on clusterless jobs.
|
||||
- Add Worker Tag support. Workers can be members of any number of tags. Workers will only work on jobs that are assigned to that tag. Jobs that do not have a tag will be available to all workers, regardless of their tag assignment. As a result, tagless workers will only work on tagless jobs.
|
||||
- Fix limitation where a job could have no more than 1000 tasks ([#104201](https://projects.blender.org/studio/flamenco/issues/104201))
|
||||
- Add support for finding the top-level 'project' directory. When submitting files to Flamenco, the add-on will try to retain the directory structure of your Blender project as precisely as possible. This new feature allows the add-on to find the top-level directory of your project by finding a `.blender_project`, `.git`, or `.subversion` directory. This can be configured in the add-on preferences.
|
||||
- Worker status is remembered when they sign off, so that workers when they come back online do so to the same state ([#99549](https://projects.blender.org/studio/flamenco/issues/99549)).
|
||||
|
@ -26,7 +26,7 @@ if __is_first_load:
|
||||
comms,
|
||||
preferences,
|
||||
projects,
|
||||
worker_clusters,
|
||||
worker_tags,
|
||||
)
|
||||
else:
|
||||
import importlib
|
||||
@ -37,7 +37,7 @@ else:
|
||||
comms = importlib.reload(comms)
|
||||
preferences = importlib.reload(preferences)
|
||||
projects = importlib.reload(projects)
|
||||
worker_clusters = importlib.reload(worker_clusters)
|
||||
worker_tags = importlib.reload(worker_tags)
|
||||
|
||||
import bpy
|
||||
|
||||
@ -155,7 +155,7 @@ def register() -> None:
|
||||
)
|
||||
|
||||
preferences.register()
|
||||
worker_clusters.register()
|
||||
worker_tags.register()
|
||||
operators.register()
|
||||
gui.register()
|
||||
job_types.register()
|
||||
@ -173,5 +173,5 @@ def unregister() -> None:
|
||||
job_types.unregister()
|
||||
gui.unregister()
|
||||
operators.unregister()
|
||||
worker_clusters.unregister()
|
||||
worker_tags.unregister()
|
||||
preferences.unregister()
|
||||
|
@ -43,10 +43,10 @@ class FLAMENCO_PT_job_submission(bpy.types.Panel):
|
||||
col.prop(context.scene, "flamenco_job_name", text="Job Name")
|
||||
col.prop(context.scene, "flamenco_job_priority", text="Priority")
|
||||
|
||||
# Worker cluster:
|
||||
# Worker tag:
|
||||
row = col.row(align=True)
|
||||
row.prop(context.scene, "flamenco_worker_cluster", text="Cluster")
|
||||
row.operator("flamenco.fetch_worker_clusters", text="", icon="FILE_REFRESH")
|
||||
row.prop(context.scene, "flamenco_worker_tag", text="Tag")
|
||||
row.operator("flamenco.fetch_worker_tags", text="", icon="FILE_REFRESH")
|
||||
|
||||
layout.separator()
|
||||
|
||||
|
@ -54,9 +54,9 @@ def job_for_scene(scene: bpy.types.Scene) -> Optional[_SubmittedJob]:
|
||||
type_etag=propgroup.job_type.etag,
|
||||
)
|
||||
|
||||
worker_cluster: str = getattr(scene, "flamenco_worker_cluster", "")
|
||||
if worker_cluster and worker_cluster != "-":
|
||||
job.worker_cluster = worker_cluster
|
||||
worker_tag: str = getattr(scene, "flamenco_worker_tag", "")
|
||||
if worker_tag and worker_tag != "-":
|
||||
job.worker_tag = worker_tag
|
||||
|
||||
return job
|
||||
|
||||
|
@ -10,7 +10,7 @@ from urllib3.exceptions import HTTPError, MaxRetryError
|
||||
|
||||
import bpy
|
||||
|
||||
from . import job_types, job_submission, preferences, worker_clusters
|
||||
from . import job_types, job_submission, preferences, worker_tags
|
||||
from .job_types_propgroup import JobTypePropertyGroup
|
||||
from .bat.submodules import bpathlib
|
||||
|
||||
@ -83,10 +83,10 @@ class FLAMENCO_OT_fetch_job_types(FlamencoOpMixin, bpy.types.Operator):
|
||||
return {"FINISHED"}
|
||||
|
||||
|
||||
class FLAMENCO_OT_fetch_worker_clusters(FlamencoOpMixin, bpy.types.Operator):
|
||||
bl_idname = "flamenco.fetch_worker_clusters"
|
||||
bl_label = "Fetch Worker Clusters"
|
||||
bl_description = "Query Flamenco Manager to obtain the available worker clusters"
|
||||
class FLAMENCO_OT_fetch_worker_tags(FlamencoOpMixin, bpy.types.Operator):
|
||||
bl_idname = "flamenco.fetch_worker_tags"
|
||||
bl_label = "Fetch Worker Tags"
|
||||
bl_description = "Query Flamenco Manager to obtain the available worker tags"
|
||||
|
||||
def execute(self, context: bpy.types.Context) -> set[str]:
|
||||
api_client = self.get_api_client(context)
|
||||
@ -94,10 +94,10 @@ class FLAMENCO_OT_fetch_worker_clusters(FlamencoOpMixin, bpy.types.Operator):
|
||||
from flamenco.manager import ApiException
|
||||
|
||||
scene = context.scene
|
||||
old_cluster = getattr(scene, "flamenco_worker_cluster", "")
|
||||
old_tag = getattr(scene, "flamenco_worker_tag", "")
|
||||
|
||||
try:
|
||||
worker_clusters.refresh(context, api_client)
|
||||
worker_tags.refresh(context, api_client)
|
||||
except ApiException as ex:
|
||||
self.report({"ERROR"}, "Error getting job types: %s" % ex)
|
||||
return {"CANCELLED"}
|
||||
@ -107,9 +107,9 @@ class FLAMENCO_OT_fetch_worker_clusters(FlamencoOpMixin, bpy.types.Operator):
|
||||
self.report({"ERROR"}, "Unable to reach Manager")
|
||||
return {"CANCELLED"}
|
||||
|
||||
if old_cluster:
|
||||
# TODO: handle cases where the old cluster no longer exists.
|
||||
scene.flamenco_worker_cluster = old_cluster
|
||||
if old_tag:
|
||||
# TODO: handle cases where the old tag no longer exists.
|
||||
scene.flamenco_worker_tag = old_tag
|
||||
|
||||
return {"FINISHED"}
|
||||
|
||||
@ -669,7 +669,7 @@ class FLAMENCO3_OT_explore_file_path(bpy.types.Operator):
|
||||
|
||||
classes = (
|
||||
FLAMENCO_OT_fetch_job_types,
|
||||
FLAMENCO_OT_fetch_worker_clusters,
|
||||
FLAMENCO_OT_fetch_worker_tags,
|
||||
FLAMENCO_OT_ping_manager,
|
||||
FLAMENCO_OT_eval_setting,
|
||||
FLAMENCO_OT_submit_job,
|
||||
|
@ -43,7 +43,7 @@ _project_finder_enum_items = [
|
||||
]
|
||||
|
||||
|
||||
class WorkerCluster(bpy.types.PropertyGroup):
|
||||
class WorkerTag(bpy.types.PropertyGroup):
|
||||
id: bpy.props.StringProperty(name="id") # type: ignore
|
||||
name: bpy.props.StringProperty(name="Name") # type: ignore
|
||||
description: bpy.props.StringProperty(name="Description") # type: ignore
|
||||
@ -93,10 +93,10 @@ class FlamencoPreferences(bpy.types.AddonPreferences):
|
||||
get=lambda prefs: prefs.job_storage,
|
||||
)
|
||||
|
||||
worker_clusters: bpy.props.CollectionProperty( # type: ignore
|
||||
type=WorkerCluster,
|
||||
name="Worker Clusters",
|
||||
description="Cache for the worker clusters available on the configured Manager",
|
||||
worker_tags: bpy.props.CollectionProperty( # type: ignore
|
||||
type=WorkerTag,
|
||||
name="Worker Tags",
|
||||
description="Cache for the worker tags available on the configured Manager",
|
||||
options={"HIDDEN"},
|
||||
)
|
||||
|
||||
@ -169,7 +169,7 @@ def manager_url(context: bpy.types.Context) -> str:
|
||||
|
||||
|
||||
classes = (
|
||||
WorkerCluster,
|
||||
WorkerTag,
|
||||
FlamencoPreferences,
|
||||
)
|
||||
_register, _unregister = bpy.utils.register_classes_factory(classes)
|
||||
|
@ -16,25 +16,25 @@ _enum_items: list[Union[tuple[str, str, str], tuple[str, str, str, int, int]]] =
|
||||
|
||||
|
||||
def refresh(context: bpy.types.Context, api_client: _ApiClient) -> None:
|
||||
"""Fetch the available worker clusters from the Manager."""
|
||||
"""Fetch the available worker tags from the Manager."""
|
||||
from flamenco.manager import ApiClient
|
||||
from flamenco.manager.api import worker_mgt_api
|
||||
from flamenco.manager.model.worker_cluster_list import WorkerClusterList
|
||||
from flamenco.manager.model.worker_tag_list import WorkerTagList
|
||||
|
||||
assert isinstance(api_client, ApiClient)
|
||||
|
||||
api = worker_mgt_api.WorkerMgtApi(api_client)
|
||||
response: WorkerClusterList = api.fetch_worker_clusters()
|
||||
response: WorkerTagList = api.fetch_worker_tags()
|
||||
|
||||
# Store on the preferences, so a cached version persists until the next refresh.
|
||||
prefs = preferences.get(context)
|
||||
prefs.worker_clusters.clear()
|
||||
prefs.worker_tags.clear()
|
||||
|
||||
for cluster in response.clusters:
|
||||
rna_cluster = prefs.worker_clusters.add()
|
||||
rna_cluster.id = cluster.id
|
||||
rna_cluster.name = cluster.name
|
||||
rna_cluster.description = getattr(cluster, "description", "")
|
||||
for tag in response.tags:
|
||||
rna_tag = prefs.worker_tags.add()
|
||||
rna_tag.id = tag.id
|
||||
rna_tag.name = tag.name
|
||||
rna_tag.description = getattr(tag, "description", "")
|
||||
|
||||
# Preferences have changed, so make sure that Blender saves them (assuming
|
||||
# auto-save here).
|
||||
@ -46,25 +46,25 @@ def _get_enum_items(self, context):
|
||||
prefs = preferences.get(context)
|
||||
|
||||
_enum_items = [
|
||||
("-", "All", "No specific cluster assigned, any worker can handle this job"),
|
||||
("-", "All", "No specific tag assigned, any worker can handle this job"),
|
||||
]
|
||||
_enum_items.extend(
|
||||
(cluster.id, cluster.name, cluster.description)
|
||||
for cluster in prefs.worker_clusters
|
||||
(tag.id, tag.name, tag.description)
|
||||
for tag in prefs.worker_tags
|
||||
)
|
||||
return _enum_items
|
||||
|
||||
|
||||
def register() -> None:
|
||||
bpy.types.Scene.flamenco_worker_cluster = bpy.props.EnumProperty(
|
||||
name="Worker Cluster",
|
||||
bpy.types.Scene.flamenco_worker_tag = bpy.props.EnumProperty(
|
||||
name="Worker Tag",
|
||||
items=_get_enum_items,
|
||||
description="The set of Workers that can handle tasks of this job",
|
||||
)
|
||||
|
||||
|
||||
def unregister() -> None:
|
||||
to_del = ((bpy.types.Scene, "flamenco_worker_cluster"),)
|
||||
to_del = ((bpy.types.Scene, "flamenco_worker_tag"),)
|
||||
for ob, attr in to_del:
|
||||
try:
|
||||
delattr(ob, attr)
|
@ -65,13 +65,13 @@ type PersistenceService interface {
|
||||
RemoveFromJobBlocklist(ctx context.Context, jobUUID, workerUUID, taskType string) error
|
||||
ClearJobBlocklist(ctx context.Context, job *persistence.Job) error
|
||||
|
||||
// Worker cluster management.
|
||||
WorkerSetClusters(ctx context.Context, worker *persistence.Worker, clusterUUIDs []string) error
|
||||
CreateWorkerCluster(ctx context.Context, cluster *persistence.WorkerCluster) error
|
||||
FetchWorkerCluster(ctx context.Context, uuid string) (*persistence.WorkerCluster, error)
|
||||
FetchWorkerClusters(ctx context.Context) ([]*persistence.WorkerCluster, error)
|
||||
DeleteWorkerCluster(ctx context.Context, uuid string) error
|
||||
SaveWorkerCluster(ctx context.Context, cluster *persistence.WorkerCluster) error
|
||||
// Worker tag management.
|
||||
WorkerSetTags(ctx context.Context, worker *persistence.Worker, tagUUIDs []string) error
|
||||
CreateWorkerTag(ctx context.Context, tag *persistence.WorkerTag) error
|
||||
FetchWorkerTag(ctx context.Context, uuid string) (*persistence.WorkerTag, error)
|
||||
FetchWorkerTags(ctx context.Context) ([]*persistence.WorkerTag, error)
|
||||
DeleteWorkerTag(ctx context.Context, uuid string) error
|
||||
SaveWorkerTag(ctx context.Context, tag *persistence.WorkerTag) error
|
||||
|
||||
// WorkersLeftToRun returns a set of worker UUIDs that can run tasks of the given type on the given job.
|
||||
WorkersLeftToRun(ctx context.Context, job *persistence.Job, taskType string) (map[string]bool, error)
|
||||
|
@ -618,8 +618,8 @@ func jobDBtoAPI(dbJob *persistence.Job) api.Job {
|
||||
if dbJob.DeleteRequestedAt.Valid {
|
||||
apiJob.DeleteRequestedAt = &dbJob.DeleteRequestedAt.Time
|
||||
}
|
||||
if dbJob.WorkerCluster != nil {
|
||||
apiJob.WorkerCluster = &dbJob.WorkerCluster.UUID
|
||||
if dbJob.WorkerTag != nil {
|
||||
apiJob.WorkerTag = &dbJob.WorkerTag.UUID
|
||||
}
|
||||
|
||||
return apiJob
|
||||
|
@ -320,19 +320,19 @@ func TestSubmitJobWithShamanCheckoutID(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestSubmitJobWithWorkerCluster(t *testing.T) {
|
||||
func TestSubmitJobWithWorkerTag(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
mf := newMockedFlamenco(mockCtrl)
|
||||
worker := testWorker()
|
||||
|
||||
workerClusterUUID := "04435762-9dc8-4f13-80b7-643a6fa5b6fd"
|
||||
cluster := persistence.WorkerCluster{
|
||||
workerTagUUID := "04435762-9dc8-4f13-80b7-643a6fa5b6fd"
|
||||
tag := persistence.WorkerTag{
|
||||
Model: persistence.Model{ID: 47},
|
||||
UUID: workerClusterUUID,
|
||||
Name: "first cluster",
|
||||
Description: "my first cluster",
|
||||
UUID: workerTagUUID,
|
||||
Name: "first tag",
|
||||
Description: "my first tag",
|
||||
}
|
||||
|
||||
submittedJob := api.SubmittedJob{
|
||||
@ -340,7 +340,7 @@ func TestSubmitJobWithWorkerCluster(t *testing.T) {
|
||||
Type: "test",
|
||||
Priority: 50,
|
||||
SubmitterPlatform: worker.Platform,
|
||||
WorkerCluster: &workerClusterUUID,
|
||||
WorkerTag: &workerTagUUID,
|
||||
}
|
||||
|
||||
mf.expectConvertTwoWayVariables(t,
|
||||
@ -352,7 +352,7 @@ func TestSubmitJobWithWorkerCluster(t *testing.T) {
|
||||
// Expect the job compiler to be called.
|
||||
authoredJob := job_compilers.AuthoredJob{
|
||||
JobID: "afc47568-bd9d-4368-8016-e91d945db36d",
|
||||
WorkerClusterUUID: workerClusterUUID,
|
||||
WorkerTagUUID: workerTagUUID,
|
||||
|
||||
Name: submittedJob.Name,
|
||||
JobType: submittedJob.Type,
|
||||
@ -382,8 +382,8 @@ func TestSubmitJobWithWorkerCluster(t *testing.T) {
|
||||
Settings: persistence.StringInterfaceMap{},
|
||||
Metadata: persistence.StringStringMap{},
|
||||
|
||||
WorkerClusterID: &cluster.ID,
|
||||
WorkerCluster: &cluster,
|
||||
WorkerTagID: &tag.ID,
|
||||
WorkerTag: &tag,
|
||||
}
|
||||
mf.persistence.EXPECT().FetchJob(gomock.Any(), queuedJob.JobID).Return(&dbJob, nil)
|
||||
|
||||
|
@ -182,7 +182,7 @@ func (f *Flamenco) RequestWorkerStatusChange(e echo.Context, workerUUID string)
|
||||
return e.NoContent(http.StatusNoContent)
|
||||
}
|
||||
|
||||
func (f *Flamenco) SetWorkerClusters(e echo.Context, workerUUID string) error {
|
||||
func (f *Flamenco) SetWorkerTags(e echo.Context, workerUUID string) error {
|
||||
ctx := e.Request().Context()
|
||||
logger := requestLogger(e)
|
||||
logger = logger.With().Str("worker", workerUUID).Logger()
|
||||
@ -192,7 +192,7 @@ func (f *Flamenco) SetWorkerClusters(e echo.Context, workerUUID string) error {
|
||||
}
|
||||
|
||||
// Decode the request body.
|
||||
var change api.WorkerClusterChangeRequest
|
||||
var change api.WorkerTagChangeRequest
|
||||
if err := e.Bind(&change); err != nil {
|
||||
logger.Warn().Err(err).Msg("bad request received")
|
||||
return sendAPIError(e, http.StatusBadRequest, "invalid format")
|
||||
@ -210,13 +210,13 @@ func (f *Flamenco) SetWorkerClusters(e echo.Context, workerUUID string) error {
|
||||
}
|
||||
|
||||
logger = logger.With().
|
||||
Strs("clusters", change.ClusterIds).
|
||||
Strs("tags", change.TagIds).
|
||||
Logger()
|
||||
logger.Info().Msg("worker cluster change requested")
|
||||
logger.Info().Msg("worker tag change requested")
|
||||
|
||||
// Store the new cluster assignment.
|
||||
if err := f.persist.WorkerSetClusters(ctx, dbWorker, change.ClusterIds); err != nil {
|
||||
logger.Error().Err(err).Msg("saving worker after cluster change request")
|
||||
// Store the new tag assignment.
|
||||
if err := f.persist.WorkerSetTags(ctx, dbWorker, change.TagIds); err != nil {
|
||||
logger.Error().Err(err).Msg("saving worker after tag change request")
|
||||
return sendAPIError(e, http.StatusInternalServerError, "error saving worker: %v", err)
|
||||
}
|
||||
|
||||
@ -227,155 +227,155 @@ func (f *Flamenco) SetWorkerClusters(e echo.Context, workerUUID string) error {
|
||||
return e.NoContent(http.StatusNoContent)
|
||||
}
|
||||
|
||||
func (f *Flamenco) DeleteWorkerCluster(e echo.Context, clusterUUID string) error {
|
||||
func (f *Flamenco) DeleteWorkerTag(e echo.Context, tagUUID string) error {
|
||||
ctx := e.Request().Context()
|
||||
logger := requestLogger(e)
|
||||
logger = logger.With().Str("cluster", clusterUUID).Logger()
|
||||
logger = logger.With().Str("tag", tagUUID).Logger()
|
||||
|
||||
if !uuid.IsValid(clusterUUID) {
|
||||
if !uuid.IsValid(tagUUID) {
|
||||
return sendAPIError(e, http.StatusBadRequest, "not a valid UUID")
|
||||
}
|
||||
|
||||
err := f.persist.DeleteWorkerCluster(ctx, clusterUUID)
|
||||
err := f.persist.DeleteWorkerTag(ctx, tagUUID)
|
||||
switch {
|
||||
case errors.Is(err, persistence.ErrWorkerClusterNotFound):
|
||||
logger.Debug().Msg("non-existent worker cluster requested")
|
||||
return sendAPIError(e, http.StatusNotFound, "worker cluster %q not found", clusterUUID)
|
||||
case errors.Is(err, persistence.ErrWorkerTagNotFound):
|
||||
logger.Debug().Msg("non-existent worker tag requested")
|
||||
return sendAPIError(e, http.StatusNotFound, "worker tag %q not found", tagUUID)
|
||||
case err != nil:
|
||||
logger.Error().Err(err).Msg("deleting worker cluster")
|
||||
return sendAPIError(e, http.StatusInternalServerError, "error deleting worker cluster: %v", err)
|
||||
logger.Error().Err(err).Msg("deleting worker tag")
|
||||
return sendAPIError(e, http.StatusInternalServerError, "error deleting worker tag: %v", err)
|
||||
}
|
||||
|
||||
// TODO: SocketIO broadcast of cluster deletion.
|
||||
// TODO: SocketIO broadcast of tag deletion.
|
||||
|
||||
logger.Info().Msg("worker cluster deleted")
|
||||
logger.Info().Msg("worker tag deleted")
|
||||
return e.NoContent(http.StatusNoContent)
|
||||
}
|
||||
|
||||
func (f *Flamenco) FetchWorkerCluster(e echo.Context, clusterUUID string) error {
|
||||
func (f *Flamenco) FetchWorkerTag(e echo.Context, tagUUID string) error {
|
||||
ctx := e.Request().Context()
|
||||
logger := requestLogger(e)
|
||||
logger = logger.With().Str("cluster", clusterUUID).Logger()
|
||||
logger = logger.With().Str("tag", tagUUID).Logger()
|
||||
|
||||
if !uuid.IsValid(clusterUUID) {
|
||||
if !uuid.IsValid(tagUUID) {
|
||||
return sendAPIError(e, http.StatusBadRequest, "not a valid UUID")
|
||||
}
|
||||
|
||||
cluster, err := f.persist.FetchWorkerCluster(ctx, clusterUUID)
|
||||
tag, err := f.persist.FetchWorkerTag(ctx, tagUUID)
|
||||
switch {
|
||||
case errors.Is(err, persistence.ErrWorkerClusterNotFound):
|
||||
logger.Debug().Msg("non-existent worker cluster requested")
|
||||
return sendAPIError(e, http.StatusNotFound, "worker cluster %q not found", clusterUUID)
|
||||
case errors.Is(err, persistence.ErrWorkerTagNotFound):
|
||||
logger.Debug().Msg("non-existent worker tag requested")
|
||||
return sendAPIError(e, http.StatusNotFound, "worker tag %q not found", tagUUID)
|
||||
case err != nil:
|
||||
logger.Error().Err(err).Msg("fetching worker cluster")
|
||||
return sendAPIError(e, http.StatusInternalServerError, "error fetching worker cluster: %v", err)
|
||||
logger.Error().Err(err).Msg("fetching worker tag")
|
||||
return sendAPIError(e, http.StatusInternalServerError, "error fetching worker tag: %v", err)
|
||||
}
|
||||
|
||||
return e.JSON(http.StatusOK, workerClusterDBtoAPI(*cluster))
|
||||
return e.JSON(http.StatusOK, workerTagDBtoAPI(*tag))
|
||||
}
|
||||
|
||||
func (f *Flamenco) UpdateWorkerCluster(e echo.Context, clusterUUID string) error {
|
||||
func (f *Flamenco) UpdateWorkerTag(e echo.Context, tagUUID string) error {
|
||||
ctx := e.Request().Context()
|
||||
logger := requestLogger(e)
|
||||
logger = logger.With().Str("cluster", clusterUUID).Logger()
|
||||
logger = logger.With().Str("tag", tagUUID).Logger()
|
||||
|
||||
if !uuid.IsValid(clusterUUID) {
|
||||
if !uuid.IsValid(tagUUID) {
|
||||
return sendAPIError(e, http.StatusBadRequest, "not a valid UUID")
|
||||
}
|
||||
|
||||
// Decode the request body.
|
||||
var update api.UpdateWorkerClusterJSONBody
|
||||
var update api.UpdateWorkerTagJSONBody
|
||||
if err := e.Bind(&update); err != nil {
|
||||
logger.Warn().Err(err).Msg("bad request received")
|
||||
return sendAPIError(e, http.StatusBadRequest, "invalid format")
|
||||
}
|
||||
|
||||
dbCluster, err := f.persist.FetchWorkerCluster(ctx, clusterUUID)
|
||||
dbTag, err := f.persist.FetchWorkerTag(ctx, tagUUID)
|
||||
switch {
|
||||
case errors.Is(err, persistence.ErrWorkerClusterNotFound):
|
||||
logger.Debug().Msg("non-existent worker cluster requested")
|
||||
return sendAPIError(e, http.StatusNotFound, "worker cluster %q not found", clusterUUID)
|
||||
case errors.Is(err, persistence.ErrWorkerTagNotFound):
|
||||
logger.Debug().Msg("non-existent worker tag requested")
|
||||
return sendAPIError(e, http.StatusNotFound, "worker tag %q not found", tagUUID)
|
||||
case err != nil:
|
||||
logger.Error().Err(err).Msg("fetching worker cluster")
|
||||
return sendAPIError(e, http.StatusInternalServerError, "error fetching worker cluster: %v", err)
|
||||
logger.Error().Err(err).Msg("fetching worker tag")
|
||||
return sendAPIError(e, http.StatusInternalServerError, "error fetching worker tag: %v", err)
|
||||
}
|
||||
|
||||
// Update the cluster.
|
||||
dbCluster.Name = update.Name
|
||||
// Update the tag.
|
||||
dbTag.Name = update.Name
|
||||
if update.Description == nil {
|
||||
dbCluster.Description = ""
|
||||
dbTag.Description = ""
|
||||
} else {
|
||||
dbCluster.Description = *update.Description
|
||||
dbTag.Description = *update.Description
|
||||
}
|
||||
|
||||
if err := f.persist.SaveWorkerCluster(ctx, dbCluster); err != nil {
|
||||
logger.Error().Err(err).Msg("saving worker cluster")
|
||||
return sendAPIError(e, http.StatusInternalServerError, "error saving worker cluster")
|
||||
if err := f.persist.SaveWorkerTag(ctx, dbTag); err != nil {
|
||||
logger.Error().Err(err).Msg("saving worker tag")
|
||||
return sendAPIError(e, http.StatusInternalServerError, "error saving worker tag")
|
||||
}
|
||||
|
||||
// TODO: SocketIO broadcast of cluster update.
|
||||
// TODO: SocketIO broadcast of tag update.
|
||||
|
||||
return e.NoContent(http.StatusNoContent)
|
||||
}
|
||||
|
||||
func (f *Flamenco) FetchWorkerClusters(e echo.Context) error {
|
||||
func (f *Flamenco) FetchWorkerTags(e echo.Context) error {
|
||||
ctx := e.Request().Context()
|
||||
logger := requestLogger(e)
|
||||
|
||||
dbClusters, err := f.persist.FetchWorkerClusters(ctx)
|
||||
dbTags, err := f.persist.FetchWorkerTags(ctx)
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("fetching worker clusters")
|
||||
return sendAPIError(e, http.StatusInternalServerError, "error saving worker cluster")
|
||||
logger.Error().Err(err).Msg("fetching worker tags")
|
||||
return sendAPIError(e, http.StatusInternalServerError, "error saving worker tag")
|
||||
}
|
||||
|
||||
apiClusters := []api.WorkerCluster{}
|
||||
for _, dbCluster := range dbClusters {
|
||||
apiCluster := workerClusterDBtoAPI(*dbCluster)
|
||||
apiClusters = append(apiClusters, apiCluster)
|
||||
apiTags := []api.WorkerTag{}
|
||||
for _, dbTag := range dbTags {
|
||||
apiTag := workerTagDBtoAPI(*dbTag)
|
||||
apiTags = append(apiTags, apiTag)
|
||||
}
|
||||
|
||||
clusterList := api.WorkerClusterList{
|
||||
Clusters: &apiClusters,
|
||||
tagList := api.WorkerTagList{
|
||||
Tags: &apiTags,
|
||||
}
|
||||
return e.JSON(http.StatusOK, &clusterList)
|
||||
return e.JSON(http.StatusOK, &tagList)
|
||||
}
|
||||
|
||||
func (f *Flamenco) CreateWorkerCluster(e echo.Context) error {
|
||||
func (f *Flamenco) CreateWorkerTag(e echo.Context) error {
|
||||
ctx := e.Request().Context()
|
||||
logger := requestLogger(e)
|
||||
|
||||
// Decode the request body.
|
||||
var apiCluster api.CreateWorkerClusterJSONBody
|
||||
if err := e.Bind(&apiCluster); err != nil {
|
||||
var apiTag api.CreateWorkerTagJSONBody
|
||||
if err := e.Bind(&apiTag); err != nil {
|
||||
logger.Warn().Err(err).Msg("bad request received")
|
||||
return sendAPIError(e, http.StatusBadRequest, "invalid format")
|
||||
}
|
||||
|
||||
// Convert to persistence layer model.
|
||||
var clusterUUID string
|
||||
if apiCluster.Id != nil && *apiCluster.Id != "" {
|
||||
clusterUUID = *apiCluster.Id
|
||||
var tagUUID string
|
||||
if apiTag.Id != nil && *apiTag.Id != "" {
|
||||
tagUUID = *apiTag.Id
|
||||
} else {
|
||||
clusterUUID = uuid.New()
|
||||
tagUUID = uuid.New()
|
||||
}
|
||||
|
||||
dbCluster := persistence.WorkerCluster{
|
||||
UUID: clusterUUID,
|
||||
Name: apiCluster.Name,
|
||||
dbTag := persistence.WorkerTag{
|
||||
UUID: tagUUID,
|
||||
Name: apiTag.Name,
|
||||
}
|
||||
if apiCluster.Description != nil {
|
||||
dbCluster.Description = *apiCluster.Description
|
||||
if apiTag.Description != nil {
|
||||
dbTag.Description = *apiTag.Description
|
||||
}
|
||||
|
||||
// Store in the database.
|
||||
if err := f.persist.CreateWorkerCluster(ctx, &dbCluster); err != nil {
|
||||
logger.Error().Err(err).Msg("creating worker cluster")
|
||||
return sendAPIError(e, http.StatusInternalServerError, "error creating worker cluster")
|
||||
if err := f.persist.CreateWorkerTag(ctx, &dbTag); err != nil {
|
||||
logger.Error().Err(err).Msg("creating worker tag")
|
||||
return sendAPIError(e, http.StatusInternalServerError, "error creating worker tag")
|
||||
}
|
||||
|
||||
// TODO: SocketIO broadcast of cluster creation.
|
||||
// TODO: SocketIO broadcast of tag creation.
|
||||
|
||||
return e.JSON(http.StatusOK, workerClusterDBtoAPI(dbCluster))
|
||||
return e.JSON(http.StatusOK, workerTagDBtoAPI(dbTag))
|
||||
}
|
||||
|
||||
func workerSummary(w persistence.Worker) api.WorkerSummary {
|
||||
@ -407,26 +407,26 @@ func workerDBtoAPI(w persistence.Worker) api.Worker {
|
||||
SupportedTaskTypes: w.TaskTypes(),
|
||||
}
|
||||
|
||||
if len(w.Clusters) > 0 {
|
||||
clusters := []api.WorkerCluster{}
|
||||
for i := range w.Clusters {
|
||||
clusters = append(clusters, workerClusterDBtoAPI(*w.Clusters[i]))
|
||||
if len(w.Tags) > 0 {
|
||||
tags := []api.WorkerTag{}
|
||||
for i := range w.Tags {
|
||||
tags = append(tags, workerTagDBtoAPI(*w.Tags[i]))
|
||||
}
|
||||
apiWorker.Clusters = &clusters
|
||||
apiWorker.Tags = &tags
|
||||
}
|
||||
|
||||
return apiWorker
|
||||
}
|
||||
|
||||
func workerClusterDBtoAPI(wc persistence.WorkerCluster) api.WorkerCluster {
|
||||
func workerTagDBtoAPI(wc persistence.WorkerTag) api.WorkerTag {
|
||||
uuid := wc.UUID // Take a copy for safety.
|
||||
|
||||
apiCluster := api.WorkerCluster{
|
||||
apiTag := api.WorkerTag{
|
||||
Id: &uuid,
|
||||
Name: wc.Name,
|
||||
}
|
||||
if len(wc.Description) > 0 {
|
||||
apiCluster.Description = &wc.Description
|
||||
apiTag.Description = &wc.Description
|
||||
}
|
||||
return apiCluster
|
||||
return apiTag
|
||||
}
|
||||
|
@ -262,58 +262,58 @@ func TestRequestWorkerStatusChangeRevert(t *testing.T) {
|
||||
assertResponseNoContent(t, echo)
|
||||
}
|
||||
|
||||
func TestWorkerClusterCRUDHappyFlow(t *testing.T) {
|
||||
func TestWorkerTagCRUDHappyFlow(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
mf := newMockedFlamenco(mockCtrl)
|
||||
|
||||
// Create a cluster.
|
||||
// Create a tag.
|
||||
UUID := "18d9234e-5135-458f-a1ba-a350c3d4e837"
|
||||
apiCluster := api.WorkerCluster{
|
||||
apiTag := api.WorkerTag{
|
||||
Id: &UUID,
|
||||
Name: "ʻO nā manu ʻino",
|
||||
Description: ptr("Ke aloha"),
|
||||
}
|
||||
expectDBCluster := persistence.WorkerCluster{
|
||||
expectDBTag := persistence.WorkerTag{
|
||||
UUID: UUID,
|
||||
Name: apiCluster.Name,
|
||||
Description: *apiCluster.Description,
|
||||
Name: apiTag.Name,
|
||||
Description: *apiTag.Description,
|
||||
}
|
||||
mf.persistence.EXPECT().CreateWorkerCluster(gomock.Any(), &expectDBCluster)
|
||||
// TODO: expect SocketIO broadcast of the cluster creation.
|
||||
echo := mf.prepareMockedJSONRequest(apiCluster)
|
||||
require.NoError(t, mf.flamenco.CreateWorkerCluster(echo))
|
||||
assertResponseJSON(t, echo, http.StatusOK, &apiCluster)
|
||||
mf.persistence.EXPECT().CreateWorkerTag(gomock.Any(), &expectDBTag)
|
||||
// TODO: expect SocketIO broadcast of the tag creation.
|
||||
echo := mf.prepareMockedJSONRequest(apiTag)
|
||||
require.NoError(t, mf.flamenco.CreateWorkerTag(echo))
|
||||
assertResponseJSON(t, echo, http.StatusOK, &apiTag)
|
||||
|
||||
// Fetch the cluster
|
||||
mf.persistence.EXPECT().FetchWorkerCluster(gomock.Any(), UUID).Return(&expectDBCluster, nil)
|
||||
// Fetch the tag
|
||||
mf.persistence.EXPECT().FetchWorkerTag(gomock.Any(), UUID).Return(&expectDBTag, nil)
|
||||
echo = mf.prepareMockedRequest(nil)
|
||||
require.NoError(t, mf.flamenco.FetchWorkerCluster(echo, UUID))
|
||||
assertResponseJSON(t, echo, http.StatusOK, &apiCluster)
|
||||
require.NoError(t, mf.flamenco.FetchWorkerTag(echo, UUID))
|
||||
assertResponseJSON(t, echo, http.StatusOK, &apiTag)
|
||||
|
||||
// Update & save.
|
||||
newUUID := "60442762-83d3-4fc3-bf75-6ab5799cdbaa"
|
||||
newAPICluster := api.WorkerCluster{
|
||||
newAPITag := api.WorkerTag{
|
||||
Id: &newUUID, // Intentionally change the UUID. This should just be ignored.
|
||||
Name: "updated name",
|
||||
}
|
||||
expectNewDBCluster := persistence.WorkerCluster{
|
||||
expectNewDBTag := persistence.WorkerTag{
|
||||
UUID: UUID,
|
||||
Name: newAPICluster.Name,
|
||||
Name: newAPITag.Name,
|
||||
Description: "",
|
||||
}
|
||||
// TODO: expect SocketIO broadcast of the cluster update.
|
||||
mf.persistence.EXPECT().FetchWorkerCluster(gomock.Any(), UUID).Return(&expectDBCluster, nil)
|
||||
mf.persistence.EXPECT().SaveWorkerCluster(gomock.Any(), &expectNewDBCluster)
|
||||
echo = mf.prepareMockedJSONRequest(newAPICluster)
|
||||
require.NoError(t, mf.flamenco.UpdateWorkerCluster(echo, UUID))
|
||||
// TODO: expect SocketIO broadcast of the tag update.
|
||||
mf.persistence.EXPECT().FetchWorkerTag(gomock.Any(), UUID).Return(&expectDBTag, nil)
|
||||
mf.persistence.EXPECT().SaveWorkerTag(gomock.Any(), &expectNewDBTag)
|
||||
echo = mf.prepareMockedJSONRequest(newAPITag)
|
||||
require.NoError(t, mf.flamenco.UpdateWorkerTag(echo, UUID))
|
||||
assertResponseNoContent(t, echo)
|
||||
|
||||
// Delete.
|
||||
mf.persistence.EXPECT().DeleteWorkerCluster(gomock.Any(), UUID)
|
||||
// TODO: expect SocketIO broadcast of the cluster deletion.
|
||||
echo = mf.prepareMockedJSONRequest(newAPICluster)
|
||||
require.NoError(t, mf.flamenco.DeleteWorkerCluster(echo, UUID))
|
||||
mf.persistence.EXPECT().DeleteWorkerTag(gomock.Any(), UUID)
|
||||
// TODO: expect SocketIO broadcast of the tag deletion.
|
||||
echo = mf.prepareMockedJSONRequest(newAPITag)
|
||||
require.NoError(t, mf.flamenco.DeleteWorkerTag(echo, UUID))
|
||||
assertResponseNoContent(t, echo)
|
||||
}
|
||||
|
@ -21,7 +21,7 @@ type Author struct {
|
||||
|
||||
type AuthoredJob struct {
|
||||
JobID string
|
||||
WorkerClusterUUID string
|
||||
WorkerTagUUID string
|
||||
|
||||
Name string
|
||||
JobType string
|
||||
|
@ -127,8 +127,8 @@ func (s *Service) Compile(ctx context.Context, sj api.SubmittedJob) (*AuthoredJo
|
||||
aj.Storage.ShamanCheckoutID = *sj.Storage.ShamanCheckoutId
|
||||
}
|
||||
|
||||
if sj.WorkerCluster != nil {
|
||||
aj.WorkerClusterUUID = *sj.WorkerCluster
|
||||
if sj.WorkerTag != nil {
|
||||
aj.WorkerTagUUID = *sj.WorkerTag
|
||||
}
|
||||
|
||||
compiler, err := vm.getCompileJob()
|
||||
|
@ -50,7 +50,7 @@ func exampleSubmittedJob() api.SubmittedJob {
|
||||
Type: "simple-blender-render",
|
||||
Settings: &settings,
|
||||
Metadata: &metadata,
|
||||
WorkerCluster: ptr("acce9983-e663-4210-b3cc-f7bfa629cb21"),
|
||||
WorkerTag: ptr("acce9983-e663-4210-b3cc-f7bfa629cb21"),
|
||||
}
|
||||
return sj
|
||||
}
|
||||
@ -80,7 +80,7 @@ func TestSimpleBlenderRenderHappy(t *testing.T) {
|
||||
|
||||
// Properties should be copied as-is.
|
||||
assert.Equal(t, sj.Name, aj.Name)
|
||||
assert.Equal(t, *sj.WorkerCluster, aj.WorkerClusterUUID)
|
||||
assert.Equal(t, *sj.WorkerTag, aj.WorkerTagUUID)
|
||||
assert.Equal(t, sj.Type, aj.JobType)
|
||||
assert.Equal(t, sj.Priority, aj.Priority)
|
||||
assert.EqualValues(t, sj.Settings.AdditionalProperties, aj.Settings)
|
||||
@ -139,7 +139,7 @@ func TestSimpleBlenderRenderHappy(t *testing.T) {
|
||||
assert.Equal(t, expectDeps, tVideo.Dependencies)
|
||||
}
|
||||
|
||||
func TestJobWithoutCluster(t *testing.T) {
|
||||
func TestJobWithoutTag(t *testing.T) {
|
||||
c := mockedClock(t)
|
||||
|
||||
s, err := Load(c)
|
||||
@ -151,20 +151,20 @@ func TestJobWithoutCluster(t *testing.T) {
|
||||
|
||||
sj := exampleSubmittedJob()
|
||||
|
||||
// Try with nil WorkerCluster.
|
||||
// Try with nil WorkerTag.
|
||||
{
|
||||
sj.WorkerCluster = nil
|
||||
sj.WorkerTag = nil
|
||||
aj, err := s.Compile(ctx, sj)
|
||||
require.NoError(t, err)
|
||||
assert.Zero(t, aj.WorkerClusterUUID)
|
||||
assert.Zero(t, aj.WorkerTagUUID)
|
||||
}
|
||||
|
||||
// Try with empty WorkerCluster.
|
||||
// Try with empty WorkerTag.
|
||||
{
|
||||
sj.WorkerCluster = ptr("")
|
||||
sj.WorkerTag = ptr("")
|
||||
aj, err := s.Compile(ctx, sj)
|
||||
require.NoError(t, err)
|
||||
assert.Zero(t, aj.WorkerClusterUUID)
|
||||
assert.Zero(t, aj.WorkerTagUUID)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -16,7 +16,7 @@ func (db *DB) migrate() error {
|
||||
&Task{},
|
||||
&TaskFailure{},
|
||||
&Worker{},
|
||||
&WorkerCluster{},
|
||||
&WorkerTag{},
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to automigrate database: %v", err)
|
||||
|
@ -12,7 +12,7 @@ var (
|
||||
ErrJobNotFound = PersistenceError{Message: "job not found", Err: gorm.ErrRecordNotFound}
|
||||
ErrTaskNotFound = PersistenceError{Message: "task not found", Err: gorm.ErrRecordNotFound}
|
||||
ErrWorkerNotFound = PersistenceError{Message: "worker not found", Err: gorm.ErrRecordNotFound}
|
||||
ErrWorkerClusterNotFound = PersistenceError{Message: "worker cluster not found", Err: gorm.ErrRecordNotFound}
|
||||
ErrWorkerTagNotFound = PersistenceError{Message: "worker tag not found", Err: gorm.ErrRecordNotFound}
|
||||
)
|
||||
|
||||
type PersistenceError struct {
|
||||
@ -40,8 +40,8 @@ func workerError(errorToWrap error, message string, msgArgs ...interface{}) erro
|
||||
return wrapError(translateGormWorkerError(errorToWrap), message, msgArgs...)
|
||||
}
|
||||
|
||||
func workerClusterError(errorToWrap error, message string, msgArgs ...interface{}) error {
|
||||
return wrapError(translateGormWorkerClusterError(errorToWrap), message, msgArgs...)
|
||||
func workerTagError(errorToWrap error, message string, msgArgs ...interface{}) error {
|
||||
return wrapError(translateGormWorkerTagError(errorToWrap), message, msgArgs...)
|
||||
}
|
||||
|
||||
func wrapError(errorToWrap error, message string, format ...interface{}) error {
|
||||
@ -86,11 +86,11 @@ func translateGormWorkerError(gormError error) error {
|
||||
return gormError
|
||||
}
|
||||
|
||||
// translateGormWorkerClusterError translates a Gorm error to a persistence layer error.
|
||||
// translateGormWorkerTagError translates a Gorm error to a persistence layer error.
|
||||
// This helps to keep Gorm as "implementation detail" of the persistence layer.
|
||||
func translateGormWorkerClusterError(gormError error) error {
|
||||
func translateGormWorkerTagError(gormError error) error {
|
||||
if errors.Is(gormError, gorm.ErrRecordNotFound) {
|
||||
return ErrWorkerClusterNotFound
|
||||
return ErrWorkerTagNotFound
|
||||
}
|
||||
return gormError
|
||||
}
|
||||
|
@ -36,8 +36,8 @@ type Job struct {
|
||||
|
||||
Storage JobStorageInfo `gorm:"embedded;embeddedPrefix:storage_"`
|
||||
|
||||
WorkerClusterID *uint
|
||||
WorkerCluster *WorkerCluster `gorm:"foreignkey:WorkerClusterID;references:ID;constraint:OnDelete:SET NULL"`
|
||||
WorkerTagID *uint
|
||||
WorkerTag *WorkerTag `gorm:"foreignkey:WorkerTagID;references:ID;constraint:OnDelete:SET NULL"`
|
||||
}
|
||||
|
||||
type StringInterfaceMap map[string]interface{}
|
||||
@ -148,14 +148,14 @@ func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.Au
|
||||
},
|
||||
}
|
||||
|
||||
// Find and assign the worker cluster.
|
||||
if authoredJob.WorkerClusterUUID != "" {
|
||||
dbCluster, err := fetchWorkerCluster(tx, authoredJob.WorkerClusterUUID)
|
||||
// Find and assign the worker tag.
|
||||
if authoredJob.WorkerTagUUID != "" {
|
||||
dbTag, err := fetchWorkerTag(tx, authoredJob.WorkerTagUUID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dbJob.WorkerClusterID = &dbCluster.ID
|
||||
dbJob.WorkerCluster = dbCluster
|
||||
dbJob.WorkerTagID = &dbTag.ID
|
||||
dbJob.WorkerTag = dbTag
|
||||
}
|
||||
|
||||
if err := tx.Create(&dbJob).Error; err != nil {
|
||||
@ -233,7 +233,7 @@ func (db *DB) FetchJob(ctx context.Context, jobUUID string) (*Job, error) {
|
||||
dbJob := Job{}
|
||||
findResult := db.gormDB.WithContext(ctx).
|
||||
Limit(1).
|
||||
Preload("WorkerCluster").
|
||||
Preload("WorkerTag").
|
||||
Find(&dbJob, "uuid = ?", jobUUID)
|
||||
if findResult.Error != nil {
|
||||
return nil, jobError(findResult.Error, "fetching job")
|
||||
|
@ -108,16 +108,16 @@ func (db *DB) WorkersLeftToRun(ctx context.Context, job *Job, taskType string) (
|
||||
Select("uuid").
|
||||
Where("id not in (?)", blockedWorkers)
|
||||
|
||||
if job.WorkerClusterID == nil {
|
||||
if job.WorkerTagID == nil {
|
||||
// Count all workers, so no extra restrictions are necessary.
|
||||
} else {
|
||||
// Only count workers in the job's cluster.
|
||||
jobCluster := db.gormDB.
|
||||
Table("worker_cluster_membership").
|
||||
// Only count workers in the job's tag.
|
||||
jobTag := db.gormDB.
|
||||
Table("worker_tag_membership").
|
||||
Select("worker_id").
|
||||
Where("worker_cluster_id = ?", *job.WorkerClusterID)
|
||||
Where("worker_tag_id = ?", *job.WorkerTagID)
|
||||
query = query.
|
||||
Where("id in (?)", jobCluster)
|
||||
Where("id in (?)", jobTag)
|
||||
}
|
||||
|
||||
// Find the workers NOT blocked.
|
||||
|
@ -126,14 +126,14 @@ func TestWorkersLeftToRun(t *testing.T) {
|
||||
worker1 := createWorker(ctx, t, db)
|
||||
worker2 := createWorkerFrom(ctx, t, db, *worker1)
|
||||
|
||||
// Create one worker cluster. It will not be used by this job, but one of the
|
||||
// Create one worker tag. It will not be used by this job, but one of the
|
||||
// workers will be assigned to it. It can get this job's tasks, though.
|
||||
// Because the job is clusterless, it can be run by all.
|
||||
cluster1 := WorkerCluster{UUID: "11157623-4b14-4801-bee2-271dddab6309", Name: "Cluster 1"}
|
||||
require.NoError(t, db.CreateWorkerCluster(ctx, &cluster1))
|
||||
// Because the job is tagless, it can be run by all.
|
||||
tag1 := WorkerTag{UUID: "11157623-4b14-4801-bee2-271dddab6309", Name: "Tag 1"}
|
||||
require.NoError(t, db.CreateWorkerTag(ctx, &tag1))
|
||||
workerC1 := createWorker(ctx, t, db, func(w *Worker) {
|
||||
w.UUID = "c1c1c1c1-0000-1111-2222-333333333333"
|
||||
w.Clusters = []*WorkerCluster{&cluster1}
|
||||
w.Tags = []*WorkerTag{&tag1}
|
||||
})
|
||||
|
||||
uuidMap := func(workers ...*Worker) map[string]bool {
|
||||
@ -172,43 +172,43 @@ func TestWorkersLeftToRun(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestWorkersLeftToRunWithClusters(t *testing.T) {
|
||||
func TestWorkersLeftToRunWithTags(t *testing.T) {
|
||||
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)
|
||||
defer cancel()
|
||||
|
||||
// Create clusters.
|
||||
cluster1 := WorkerCluster{UUID: "11157623-4b14-4801-bee2-271dddab6309", Name: "Cluster 1"}
|
||||
cluster2 := WorkerCluster{UUID: "22257623-4b14-4801-bee2-271dddab6309", Name: "Cluster 2"}
|
||||
cluster3 := WorkerCluster{UUID: "33357623-4b14-4801-bee2-271dddab6309", Name: "Cluster 3"}
|
||||
require.NoError(t, db.CreateWorkerCluster(ctx, &cluster1))
|
||||
require.NoError(t, db.CreateWorkerCluster(ctx, &cluster2))
|
||||
require.NoError(t, db.CreateWorkerCluster(ctx, &cluster3))
|
||||
// Create tags.
|
||||
tag1 := WorkerTag{UUID: "11157623-4b14-4801-bee2-271dddab6309", Name: "Tag 1"}
|
||||
tag2 := WorkerTag{UUID: "22257623-4b14-4801-bee2-271dddab6309", Name: "Tag 2"}
|
||||
tag3 := WorkerTag{UUID: "33357623-4b14-4801-bee2-271dddab6309", Name: "Tag 3"}
|
||||
require.NoError(t, db.CreateWorkerTag(ctx, &tag1))
|
||||
require.NoError(t, db.CreateWorkerTag(ctx, &tag2))
|
||||
require.NoError(t, db.CreateWorkerTag(ctx, &tag3))
|
||||
|
||||
// Create a job in cluster1.
|
||||
// Create a job in tag1.
|
||||
authoredJob := createTestAuthoredJobWithTasks()
|
||||
authoredJob.WorkerClusterUUID = cluster1.UUID
|
||||
authoredJob.WorkerTagUUID = tag1.UUID
|
||||
job := persistAuthoredJob(t, ctx, db, authoredJob)
|
||||
|
||||
// Clusters 1 + 3
|
||||
// Tags 1 + 3
|
||||
workerC13 := createWorker(ctx, t, db, func(w *Worker) {
|
||||
w.UUID = "c13c1313-0000-1111-2222-333333333333"
|
||||
w.Clusters = []*WorkerCluster{&cluster1, &cluster3}
|
||||
w.Tags = []*WorkerTag{&tag1, &tag3}
|
||||
})
|
||||
// Cluster 1
|
||||
// Tag 1
|
||||
workerC1 := createWorker(ctx, t, db, func(w *Worker) {
|
||||
w.UUID = "c1c1c1c1-0000-1111-2222-333333333333"
|
||||
w.Clusters = []*WorkerCluster{&cluster1}
|
||||
w.Tags = []*WorkerTag{&tag1}
|
||||
})
|
||||
// Cluster 2 worker, this one should never appear.
|
||||
// Tag 2 worker, this one should never appear.
|
||||
createWorker(ctx, t, db, func(w *Worker) {
|
||||
w.UUID = "c2c2c2c2-0000-1111-2222-333333333333"
|
||||
w.Clusters = []*WorkerCluster{&cluster2}
|
||||
w.Tags = []*WorkerTag{&tag2}
|
||||
})
|
||||
// No clusters, so should be able to run only clusterless jobs. Which is none
|
||||
// No tags, so should be able to run only tagless jobs. Which is none
|
||||
// in this test.
|
||||
createWorker(ctx, t, db, func(w *Worker) {
|
||||
w.UUID = "00000000-0000-1111-2222-333333333333"
|
||||
w.Clusters = nil
|
||||
w.Tags = nil
|
||||
})
|
||||
|
||||
uuidMap := func(workers ...*Worker) map[string]bool {
|
||||
@ -219,7 +219,7 @@ func TestWorkersLeftToRunWithClusters(t *testing.T) {
|
||||
return theMap
|
||||
}
|
||||
|
||||
// All Cluster 1 workers, no blocklist.
|
||||
// All Tag 1 workers, no blocklist.
|
||||
left, err := db.WorkersLeftToRun(ctx, job, "blender")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, uuidMap(workerC13, workerC1), left)
|
||||
@ -230,7 +230,7 @@ func TestWorkersLeftToRunWithClusters(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, uuidMap(workerC13), left)
|
||||
|
||||
// All clustered workers blocked.
|
||||
// All taged workers blocked.
|
||||
_ = db.AddWorkerToJobBlocklist(ctx, job, workerC13, "blender")
|
||||
left, err = db.WorkersLeftToRun(ctx, job, "blender")
|
||||
assert.NoError(t, err)
|
||||
|
@ -64,7 +64,7 @@ func (db *DB) QueryJobs(ctx context.Context, apiQ api.JobsQuery) ([]*Job, error)
|
||||
}
|
||||
}
|
||||
|
||||
q.Preload("Cluster")
|
||||
q.Preload("Tag")
|
||||
|
||||
result := []*Job{}
|
||||
tx := q.Scan(&result)
|
||||
|
@ -757,7 +757,7 @@ func createWorker(ctx context.Context, t *testing.T, db *DB, updaters ...func(*W
|
||||
Software: "3.0",
|
||||
Status: api.WorkerStatusAwake,
|
||||
SupportedTaskTypes: "blender,ffmpeg,file-management",
|
||||
Clusters: nil,
|
||||
Tags: nil,
|
||||
}
|
||||
|
||||
for _, updater := range updaters {
|
||||
|
@ -26,7 +26,7 @@ func (db *DB) ScheduleTask(ctx context.Context, w *Worker) (*Task, error) {
|
||||
logger := log.With().Str("worker", w.UUID).Logger()
|
||||
logger.Trace().Msg("finding task for worker")
|
||||
|
||||
hasWorkerClusters, err := db.HasWorkerClusters(ctx)
|
||||
hasWorkerTags, err := db.HasWorkerTags(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -37,7 +37,7 @@ func (db *DB) ScheduleTask(ctx context.Context, w *Worker) (*Task, error) {
|
||||
var task *Task
|
||||
txErr := db.gormDB.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
|
||||
var err error
|
||||
task, err = findTaskForWorker(tx, w, hasWorkerClusters)
|
||||
task, err = findTaskForWorker(tx, w, hasWorkerTags)
|
||||
if err != nil {
|
||||
if isDatabaseBusyError(err) {
|
||||
logger.Trace().Err(err).Msg("database busy while finding task for worker")
|
||||
@ -84,7 +84,7 @@ func (db *DB) ScheduleTask(ctx context.Context, w *Worker) (*Task, error) {
|
||||
return task, nil
|
||||
}
|
||||
|
||||
func findTaskForWorker(tx *gorm.DB, w *Worker, checkWorkerClusters bool) (*Task, error) {
|
||||
func findTaskForWorker(tx *gorm.DB, w *Worker, checkWorkerTags bool) (*Task, error) {
|
||||
task := Task{}
|
||||
|
||||
// If a task is alreay active & assigned to this worker, return just that.
|
||||
@ -129,21 +129,21 @@ func findTaskForWorker(tx *gorm.DB, w *Worker, checkWorkerClusters bool) (*Task,
|
||||
Where("TF.worker_id is NULL"). // Not failed before
|
||||
Where("tasks.type not in (?)", blockedTaskTypesQuery) // Non-blocklisted
|
||||
|
||||
if checkWorkerClusters {
|
||||
// The system has one or more clusters, so limit the available jobs to those
|
||||
// that have no cluster, or overlap with the Worker's clusters.
|
||||
if len(w.Clusters) == 0 {
|
||||
// Clusterless workers only get clusterless jobs.
|
||||
if checkWorkerTags {
|
||||
// The system has one or more tags, so limit the available jobs to those
|
||||
// that have no tag, or overlap with the Worker's tags.
|
||||
if len(w.Tags) == 0 {
|
||||
// Tagless workers only get tagless jobs.
|
||||
findTaskQuery = findTaskQuery.
|
||||
Where("jobs.worker_cluster_id is NULL")
|
||||
Where("jobs.worker_tag_id is NULL")
|
||||
} else {
|
||||
// Clustered workers get clusterless jobs AND jobs of their own clusters.
|
||||
clusterIDs := []uint{}
|
||||
for _, cluster := range w.Clusters {
|
||||
clusterIDs = append(clusterIDs, cluster.ID)
|
||||
// Taged workers get tagless jobs AND jobs of their own tags.
|
||||
tagIDs := []uint{}
|
||||
for _, tag := range w.Tags {
|
||||
tagIDs = append(tagIDs, tag.ID)
|
||||
}
|
||||
findTaskQuery = findTaskQuery.
|
||||
Where("jobs.worker_cluster_id is NULL or worker_cluster_id in ?", clusterIDs)
|
||||
Where("jobs.worker_tag_id is NULL or worker_tag_id in ?", tagIDs)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -291,87 +291,87 @@ func TestPreviouslyFailed(t *testing.T) {
|
||||
assert.Equal(t, att2.Name, task.Name, "the second task should have been chosen")
|
||||
}
|
||||
|
||||
func TestWorkerClusterJobWithCluster(t *testing.T) {
|
||||
func TestWorkerTagJobWithTag(t *testing.T) {
|
||||
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)
|
||||
defer cancel()
|
||||
|
||||
// Create worker clusters:
|
||||
cluster1 := WorkerCluster{UUID: "f0157623-4b14-4801-bee2-271dddab6309", Name: "Cluster 1"}
|
||||
cluster2 := WorkerCluster{UUID: "2f71dba1-cf92-4752-8386-f5926affabd5", Name: "Cluster 2"}
|
||||
require.NoError(t, db.CreateWorkerCluster(ctx, &cluster1))
|
||||
require.NoError(t, db.CreateWorkerCluster(ctx, &cluster2))
|
||||
// Create worker tags:
|
||||
tag1 := WorkerTag{UUID: "f0157623-4b14-4801-bee2-271dddab6309", Name: "Tag 1"}
|
||||
tag2 := WorkerTag{UUID: "2f71dba1-cf92-4752-8386-f5926affabd5", Name: "Tag 2"}
|
||||
require.NoError(t, db.CreateWorkerTag(ctx, &tag1))
|
||||
require.NoError(t, db.CreateWorkerTag(ctx, &tag2))
|
||||
|
||||
// Create a worker in cluster1:
|
||||
// Create a worker in tag1:
|
||||
workerC := linuxWorker(t, db, func(w *Worker) {
|
||||
w.Clusters = []*WorkerCluster{&cluster1}
|
||||
w.Tags = []*WorkerTag{&tag1}
|
||||
})
|
||||
|
||||
// Create a worker without cluster:
|
||||
// Create a worker without tag:
|
||||
workerNC := linuxWorker(t, db, func(w *Worker) {
|
||||
w.UUID = "c53f8f68-4149-4790-991c-ba73a326551e"
|
||||
w.Clusters = nil
|
||||
w.Tags = nil
|
||||
})
|
||||
|
||||
{ // Test job with different cluster:
|
||||
{ // Test job with different tag:
|
||||
authTask := authorTestTask("the task", "blender")
|
||||
job := authorTestJob("499cf0f8-e83d-4cb1-837a-df94789d07db", "simple-blender-render", authTask)
|
||||
job.WorkerClusterUUID = cluster2.UUID
|
||||
job.WorkerTagUUID = tag2.UUID
|
||||
constructTestJob(ctx, t, db, job)
|
||||
|
||||
task, err := db.ScheduleTask(ctx, &workerC)
|
||||
require.NoError(t, err)
|
||||
assert.Nil(t, task, "job with different cluster should not be scheduled")
|
||||
assert.Nil(t, task, "job with different tag should not be scheduled")
|
||||
}
|
||||
|
||||
{ // Test job with matching cluster:
|
||||
{ // Test job with matching tag:
|
||||
authTask := authorTestTask("the task", "blender")
|
||||
job := authorTestJob("5d4c2321-0bb7-4c13-a9dd-32a2c0cd156e", "simple-blender-render", authTask)
|
||||
job.WorkerClusterUUID = cluster1.UUID
|
||||
job.WorkerTagUUID = tag1.UUID
|
||||
constructTestJob(ctx, t, db, job)
|
||||
|
||||
task, err := db.ScheduleTask(ctx, &workerC)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, task, "job with matching cluster should be scheduled")
|
||||
require.NotNil(t, task, "job with matching tag should be scheduled")
|
||||
assert.Equal(t, authTask.UUID, task.UUID)
|
||||
|
||||
task, err = db.ScheduleTask(ctx, &workerNC)
|
||||
require.NoError(t, err)
|
||||
assert.Nil(t, task, "job with cluster should not be scheduled for worker without cluster")
|
||||
assert.Nil(t, task, "job with tag should not be scheduled for worker without tag")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWorkerClusterJobWithoutCluster(t *testing.T) {
|
||||
func TestWorkerTagJobWithoutTag(t *testing.T) {
|
||||
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)
|
||||
defer cancel()
|
||||
|
||||
// Create worker cluster:
|
||||
cluster1 := WorkerCluster{UUID: "f0157623-4b14-4801-bee2-271dddab6309", Name: "Cluster 1"}
|
||||
require.NoError(t, db.CreateWorkerCluster(ctx, &cluster1))
|
||||
// Create worker tag:
|
||||
tag1 := WorkerTag{UUID: "f0157623-4b14-4801-bee2-271dddab6309", Name: "Tag 1"}
|
||||
require.NoError(t, db.CreateWorkerTag(ctx, &tag1))
|
||||
|
||||
// Create a worker in cluster1:
|
||||
// Create a worker in tag1:
|
||||
workerC := linuxWorker(t, db, func(w *Worker) {
|
||||
w.Clusters = []*WorkerCluster{&cluster1}
|
||||
w.Tags = []*WorkerTag{&tag1}
|
||||
})
|
||||
|
||||
// Create a worker without cluster:
|
||||
// Create a worker without tag:
|
||||
workerNC := linuxWorker(t, db, func(w *Worker) {
|
||||
w.UUID = "c53f8f68-4149-4790-991c-ba73a326551e"
|
||||
w.Clusters = nil
|
||||
w.Tags = nil
|
||||
})
|
||||
|
||||
// Test cluster-less job:
|
||||
// Test tag-less job:
|
||||
authTask := authorTestTask("the task", "blender")
|
||||
job := authorTestJob("b6a1d859-122f-4791-8b78-b943329a9989", "simple-blender-render", authTask)
|
||||
constructTestJob(ctx, t, db, job)
|
||||
|
||||
task, err := db.ScheduleTask(ctx, &workerC)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, task, "job without cluster should always be scheduled to worker in some cluster")
|
||||
require.NotNil(t, task, "job without tag should always be scheduled to worker in some tag")
|
||||
assert.Equal(t, authTask.UUID, task.UUID)
|
||||
|
||||
task, err = db.ScheduleTask(ctx, &workerNC)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, task, "job without cluster should always be scheduled to worker without cluster")
|
||||
require.NotNil(t, task, "job without tag should always be scheduled to worker without tag")
|
||||
assert.Equal(t, authTask.UUID, task.UUID)
|
||||
}
|
||||
|
||||
|
@ -97,7 +97,7 @@ type WorkerTestFixture struct {
|
||||
done func()
|
||||
|
||||
worker *Worker
|
||||
cluster *WorkerCluster
|
||||
tag *WorkerTag
|
||||
}
|
||||
|
||||
func workerTestFixtures(t *testing.T, testContextTimeout time.Duration) WorkerTestFixture {
|
||||
@ -113,14 +113,14 @@ func workerTestFixtures(t *testing.T, testContextTimeout time.Duration) WorkerTe
|
||||
SupportedTaskTypes: "blender,ffmpeg,file-management",
|
||||
}
|
||||
|
||||
wc := WorkerCluster{
|
||||
wc := WorkerTag{
|
||||
UUID: uuid.New(),
|
||||
Name: "arbejdsklynge",
|
||||
Description: "Worker cluster in Danish",
|
||||
Description: "Worker tag in Danish",
|
||||
}
|
||||
|
||||
require.NoError(t, db.CreateWorker(ctx, &w))
|
||||
require.NoError(t, db.CreateWorkerCluster(ctx, &wc))
|
||||
require.NoError(t, db.CreateWorkerTag(ctx, &wc))
|
||||
|
||||
return WorkerTestFixture{
|
||||
db: db,
|
||||
@ -128,6 +128,6 @@ func workerTestFixtures(t *testing.T, testContextTimeout time.Duration) WorkerTe
|
||||
done: cancel,
|
||||
|
||||
worker: &w,
|
||||
cluster: &wc,
|
||||
tag: &wc,
|
||||
}
|
||||
}
|
||||
|
@ -1,112 +0,0 @@
|
||||
package persistence
|
||||
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type WorkerCluster struct {
|
||||
Model
|
||||
|
||||
UUID string `gorm:"type:char(36);default:'';unique;index"`
|
||||
Name string `gorm:"type:varchar(64);default:'';unique"`
|
||||
Description string `gorm:"type:varchar(255);default:''"`
|
||||
|
||||
Workers []*Worker `gorm:"many2many:worker_cluster_membership;constraint:OnDelete:CASCADE"`
|
||||
}
|
||||
|
||||
func (db *DB) CreateWorkerCluster(ctx context.Context, wc *WorkerCluster) error {
|
||||
if err := db.gormDB.WithContext(ctx).Create(wc).Error; err != nil {
|
||||
return fmt.Errorf("creating new worker cluster: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// HasWorkerClusters returns whether there are any clusters defined at all.
|
||||
func (db *DB) HasWorkerClusters(ctx context.Context) (bool, error) {
|
||||
var count int64
|
||||
tx := db.gormDB.WithContext(ctx).
|
||||
Model(&WorkerCluster{}).
|
||||
Count(&count)
|
||||
if err := tx.Error; err != nil {
|
||||
return false, workerClusterError(err, "counting worker clusters")
|
||||
}
|
||||
return count > 0, nil
|
||||
}
|
||||
|
||||
func (db *DB) FetchWorkerCluster(ctx context.Context, uuid string) (*WorkerCluster, error) {
|
||||
tx := db.gormDB.WithContext(ctx)
|
||||
return fetchWorkerCluster(tx, uuid)
|
||||
}
|
||||
|
||||
// fetchWorkerCluster fetches the worker cluster using the given database instance.
|
||||
func fetchWorkerCluster(gormDB *gorm.DB, uuid string) (*WorkerCluster, error) {
|
||||
w := WorkerCluster{}
|
||||
tx := gormDB.First(&w, "uuid = ?", uuid)
|
||||
if tx.Error != nil {
|
||||
return nil, workerClusterError(tx.Error, "fetching worker cluster")
|
||||
}
|
||||
return &w, nil
|
||||
}
|
||||
|
||||
func (db *DB) SaveWorkerCluster(ctx context.Context, cluster *WorkerCluster) error {
|
||||
if err := db.gormDB.WithContext(ctx).Save(cluster).Error; err != nil {
|
||||
return workerClusterError(err, "saving worker cluster")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteWorkerCluster deletes the given cluster, after unassigning all workers from it.
|
||||
func (db *DB) DeleteWorkerCluster(ctx context.Context, uuid string) error {
|
||||
tx := db.gormDB.WithContext(ctx).
|
||||
Where("uuid = ?", uuid).
|
||||
Delete(&WorkerCluster{})
|
||||
if tx.Error != nil {
|
||||
return workerClusterError(tx.Error, "deleting worker cluster")
|
||||
}
|
||||
if tx.RowsAffected == 0 {
|
||||
return ErrWorkerClusterNotFound
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *DB) FetchWorkerClusters(ctx context.Context) ([]*WorkerCluster, error) {
|
||||
clusters := make([]*WorkerCluster, 0)
|
||||
tx := db.gormDB.WithContext(ctx).Model(&WorkerCluster{}).Scan(&clusters)
|
||||
if tx.Error != nil {
|
||||
return nil, workerClusterError(tx.Error, "fetching all worker clusters")
|
||||
}
|
||||
return clusters, nil
|
||||
}
|
||||
|
||||
func (db *DB) fetchWorkerClustersWithUUID(ctx context.Context, clusterUUIDs []string) ([]*WorkerCluster, error) {
|
||||
clusters := make([]*WorkerCluster, 0)
|
||||
tx := db.gormDB.WithContext(ctx).
|
||||
Model(&WorkerCluster{}).
|
||||
Where("uuid in ?", clusterUUIDs).
|
||||
Scan(&clusters)
|
||||
if tx.Error != nil {
|
||||
return nil, workerClusterError(tx.Error, "fetching all worker clusters")
|
||||
}
|
||||
return clusters, nil
|
||||
}
|
||||
|
||||
func (db *DB) WorkerSetClusters(ctx context.Context, worker *Worker, clusterUUIDs []string) error {
|
||||
clusters, err := db.fetchWorkerClustersWithUUID(ctx, clusterUUIDs)
|
||||
if err != nil {
|
||||
return workerClusterError(err, "fetching worker clusters")
|
||||
}
|
||||
|
||||
err = db.gormDB.WithContext(ctx).
|
||||
Model(worker).
|
||||
Association("Clusters").
|
||||
Replace(clusters)
|
||||
if err != nil {
|
||||
return workerClusterError(err, "updating worker clusters")
|
||||
}
|
||||
return nil
|
||||
}
|
@ -1,165 +0,0 @@
|
||||
package persistence
|
||||
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.blender.org/flamenco/internal/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestCreateFetchCluster(t *testing.T) {
|
||||
f := workerTestFixtures(t, 1*time.Second)
|
||||
defer f.done()
|
||||
|
||||
// Test fetching non-existent cluster
|
||||
fetchedCluster, err := f.db.FetchWorkerCluster(f.ctx, "7ee21bc8-ff1a-42d2-a6b6-cc4b529b189f")
|
||||
assert.ErrorIs(t, err, ErrWorkerClusterNotFound)
|
||||
assert.Nil(t, fetchedCluster)
|
||||
|
||||
// New cluster creation is already done in the workerTestFixtures() call.
|
||||
assert.NotNil(t, f.cluster)
|
||||
|
||||
fetchedCluster, err = f.db.FetchWorkerCluster(f.ctx, f.cluster.UUID)
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, fetchedCluster)
|
||||
|
||||
// Test contents of fetched cluster.
|
||||
assert.Equal(t, f.cluster.UUID, fetchedCluster.UUID)
|
||||
assert.Equal(t, f.cluster.Name, fetchedCluster.Name)
|
||||
assert.Equal(t, f.cluster.Description, fetchedCluster.Description)
|
||||
assert.Zero(t, fetchedCluster.Workers)
|
||||
}
|
||||
|
||||
func TestFetchDeleteClusters(t *testing.T) {
|
||||
f := workerTestFixtures(t, 1*time.Second)
|
||||
defer f.done()
|
||||
|
||||
// Single cluster was created by fixture.
|
||||
has, err := f.db.HasWorkerClusters(f.ctx)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, has, "expecting HasWorkerClusters to return true")
|
||||
|
||||
secondCluster := WorkerCluster{
|
||||
UUID: uuid.New(),
|
||||
Name: "arbeiderscluster",
|
||||
Description: "Worker cluster in Dutch",
|
||||
}
|
||||
|
||||
require.NoError(t, f.db.CreateWorkerCluster(f.ctx, &secondCluster))
|
||||
|
||||
allClusters, err := f.db.FetchWorkerClusters(f.ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, allClusters, 2)
|
||||
var allClusterIDs [2]string
|
||||
for idx := range allClusters {
|
||||
allClusterIDs[idx] = allClusters[idx].UUID
|
||||
}
|
||||
assert.Contains(t, allClusterIDs, f.cluster.UUID)
|
||||
assert.Contains(t, allClusterIDs, secondCluster.UUID)
|
||||
|
||||
has, err = f.db.HasWorkerClusters(f.ctx)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, has, "expecting HasWorkerClusters to return true")
|
||||
|
||||
// Test deleting the 2nd cluster.
|
||||
require.NoError(t, f.db.DeleteWorkerCluster(f.ctx, secondCluster.UUID))
|
||||
|
||||
allClusters, err = f.db.FetchWorkerClusters(f.ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, allClusters, 1)
|
||||
assert.Equal(t, f.cluster.UUID, allClusters[0].UUID)
|
||||
|
||||
// Test deleting the 1st cluster.
|
||||
require.NoError(t, f.db.DeleteWorkerCluster(f.ctx, f.cluster.UUID))
|
||||
has, err = f.db.HasWorkerClusters(f.ctx)
|
||||
require.NoError(t, err)
|
||||
assert.False(t, has, "expecting HasWorkerClusters to return false")
|
||||
}
|
||||
|
||||
func TestAssignUnassignWorkerClusters(t *testing.T) {
|
||||
f := workerTestFixtures(t, 1*time.Second)
|
||||
defer f.done()
|
||||
|
||||
assertClusters := func(msgLabel string, clusterUUIDs ...string) {
|
||||
w, err := f.db.FetchWorker(f.ctx, f.worker.UUID)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Catch doubly-reported clusters, as the maps below would hide those cases.
|
||||
assert.Len(t, w.Clusters, len(clusterUUIDs), msgLabel)
|
||||
|
||||
expectClusters := make(map[string]bool)
|
||||
for _, cid := range clusterUUIDs {
|
||||
expectClusters[cid] = true
|
||||
}
|
||||
|
||||
actualClusters := make(map[string]bool)
|
||||
for _, c := range w.Clusters {
|
||||
actualClusters[c.UUID] = true
|
||||
}
|
||||
|
||||
assert.Equal(t, expectClusters, actualClusters, msgLabel)
|
||||
}
|
||||
|
||||
secondCluster := WorkerCluster{
|
||||
UUID: uuid.New(),
|
||||
Name: "arbeiderscluster",
|
||||
Description: "Worker cluster in Dutch",
|
||||
}
|
||||
|
||||
require.NoError(t, f.db.CreateWorkerCluster(f.ctx, &secondCluster))
|
||||
|
||||
// By default the Worker should not be part of a cluster.
|
||||
assertClusters("default cluster assignment")
|
||||
|
||||
require.NoError(t, f.db.WorkerSetClusters(f.ctx, f.worker, []string{f.cluster.UUID}))
|
||||
assertClusters("setting one cluster", f.cluster.UUID)
|
||||
|
||||
// Double assignments should also just work.
|
||||
require.NoError(t, f.db.WorkerSetClusters(f.ctx, f.worker, []string{f.cluster.UUID, f.cluster.UUID}))
|
||||
assertClusters("setting twice the same cluster", f.cluster.UUID)
|
||||
|
||||
// Multiple cluster memberships.
|
||||
require.NoError(t, f.db.WorkerSetClusters(f.ctx, f.worker, []string{f.cluster.UUID, secondCluster.UUID}))
|
||||
assertClusters("setting two different clusters", f.cluster.UUID, secondCluster.UUID)
|
||||
|
||||
// Remove memberships.
|
||||
require.NoError(t, f.db.WorkerSetClusters(f.ctx, f.worker, []string{secondCluster.UUID}))
|
||||
assertClusters("unassigning from first cluster", secondCluster.UUID)
|
||||
require.NoError(t, f.db.WorkerSetClusters(f.ctx, f.worker, []string{}))
|
||||
assertClusters("unassigning from second cluster")
|
||||
}
|
||||
|
||||
func TestSaveWorkerCluster(t *testing.T) {
|
||||
f := workerTestFixtures(t, 1*time.Second)
|
||||
defer f.done()
|
||||
|
||||
f.cluster.Name = "übercluster"
|
||||
f.cluster.Description = "ʻO kēlā hui ma laila"
|
||||
require.NoError(t, f.db.SaveWorkerCluster(f.ctx, f.cluster))
|
||||
|
||||
fetched, err := f.db.FetchWorkerCluster(f.ctx, f.cluster.UUID)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, f.cluster.Name, fetched.Name)
|
||||
assert.Equal(t, f.cluster.Description, fetched.Description)
|
||||
}
|
||||
|
||||
func TestDeleteWorkerClusterWithWorkersAssigned(t *testing.T) {
|
||||
f := workerTestFixtures(t, 1*time.Second)
|
||||
defer f.done()
|
||||
|
||||
// Assign the worker.
|
||||
require.NoError(t, f.db.WorkerSetClusters(f.ctx, f.worker, []string{f.cluster.UUID}))
|
||||
|
||||
// Delete the cluster.
|
||||
require.NoError(t, f.db.DeleteWorkerCluster(f.ctx, f.cluster.UUID))
|
||||
|
||||
// Check the Worker has been unassigned from the cluster.
|
||||
w, err := f.db.FetchWorker(f.ctx, f.worker.UUID)
|
||||
require.NoError(t, err)
|
||||
assert.Empty(t, w.Clusters)
|
||||
}
|
112
internal/manager/persistence/worker_tag.go
Normal file
112
internal/manager/persistence/worker_tag.go
Normal file
@ -0,0 +1,112 @@
|
||||
package persistence
|
||||
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type WorkerTag struct {
|
||||
Model
|
||||
|
||||
UUID string `gorm:"type:char(36);default:'';unique;index"`
|
||||
Name string `gorm:"type:varchar(64);default:'';unique"`
|
||||
Description string `gorm:"type:varchar(255);default:''"`
|
||||
|
||||
Workers []*Worker `gorm:"many2many:worker_tag_membership;constraint:OnDelete:CASCADE"`
|
||||
}
|
||||
|
||||
func (db *DB) CreateWorkerTag(ctx context.Context, wc *WorkerTag) error {
|
||||
if err := db.gormDB.WithContext(ctx).Create(wc).Error; err != nil {
|
||||
return fmt.Errorf("creating new worker tag: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// HasWorkerTags returns whether there are any tags defined at all.
|
||||
func (db *DB) HasWorkerTags(ctx context.Context) (bool, error) {
|
||||
var count int64
|
||||
tx := db.gormDB.WithContext(ctx).
|
||||
Model(&WorkerTag{}).
|
||||
Count(&count)
|
||||
if err := tx.Error; err != nil {
|
||||
return false, workerTagError(err, "counting worker tags")
|
||||
}
|
||||
return count > 0, nil
|
||||
}
|
||||
|
||||
func (db *DB) FetchWorkerTag(ctx context.Context, uuid string) (*WorkerTag, error) {
|
||||
tx := db.gormDB.WithContext(ctx)
|
||||
return fetchWorkerTag(tx, uuid)
|
||||
}
|
||||
|
||||
// fetchWorkerTag fetches the worker tag using the given database instance.
|
||||
func fetchWorkerTag(gormDB *gorm.DB, uuid string) (*WorkerTag, error) {
|
||||
w := WorkerTag{}
|
||||
tx := gormDB.First(&w, "uuid = ?", uuid)
|
||||
if tx.Error != nil {
|
||||
return nil, workerTagError(tx.Error, "fetching worker tag")
|
||||
}
|
||||
return &w, nil
|
||||
}
|
||||
|
||||
func (db *DB) SaveWorkerTag(ctx context.Context, tag *WorkerTag) error {
|
||||
if err := db.gormDB.WithContext(ctx).Save(tag).Error; err != nil {
|
||||
return workerTagError(err, "saving worker tag")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteWorkerTag deletes the given tag, after unassigning all workers from it.
|
||||
func (db *DB) DeleteWorkerTag(ctx context.Context, uuid string) error {
|
||||
tx := db.gormDB.WithContext(ctx).
|
||||
Where("uuid = ?", uuid).
|
||||
Delete(&WorkerTag{})
|
||||
if tx.Error != nil {
|
||||
return workerTagError(tx.Error, "deleting worker tag")
|
||||
}
|
||||
if tx.RowsAffected == 0 {
|
||||
return ErrWorkerTagNotFound
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *DB) FetchWorkerTags(ctx context.Context) ([]*WorkerTag, error) {
|
||||
tags := make([]*WorkerTag, 0)
|
||||
tx := db.gormDB.WithContext(ctx).Model(&WorkerTag{}).Scan(&tags)
|
||||
if tx.Error != nil {
|
||||
return nil, workerTagError(tx.Error, "fetching all worker tags")
|
||||
}
|
||||
return tags, nil
|
||||
}
|
||||
|
||||
func (db *DB) fetchWorkerTagsWithUUID(ctx context.Context, tagUUIDs []string) ([]*WorkerTag, error) {
|
||||
tags := make([]*WorkerTag, 0)
|
||||
tx := db.gormDB.WithContext(ctx).
|
||||
Model(&WorkerTag{}).
|
||||
Where("uuid in ?", tagUUIDs).
|
||||
Scan(&tags)
|
||||
if tx.Error != nil {
|
||||
return nil, workerTagError(tx.Error, "fetching all worker tags")
|
||||
}
|
||||
return tags, nil
|
||||
}
|
||||
|
||||
func (db *DB) WorkerSetTags(ctx context.Context, worker *Worker, tagUUIDs []string) error {
|
||||
tags, err := db.fetchWorkerTagsWithUUID(ctx, tagUUIDs)
|
||||
if err != nil {
|
||||
return workerTagError(err, "fetching worker tags")
|
||||
}
|
||||
|
||||
err = db.gormDB.WithContext(ctx).
|
||||
Model(worker).
|
||||
Association("Tags").
|
||||
Replace(tags)
|
||||
if err != nil {
|
||||
return workerTagError(err, "updating worker tags")
|
||||
}
|
||||
return nil
|
||||
}
|
165
internal/manager/persistence/worker_tag_test.go
Normal file
165
internal/manager/persistence/worker_tag_test.go
Normal file
@ -0,0 +1,165 @@
|
||||
package persistence
|
||||
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.blender.org/flamenco/internal/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestCreateFetchTag(t *testing.T) {
|
||||
f := workerTestFixtures(t, 1*time.Second)
|
||||
defer f.done()
|
||||
|
||||
// Test fetching non-existent tag
|
||||
fetchedTag, err := f.db.FetchWorkerTag(f.ctx, "7ee21bc8-ff1a-42d2-a6b6-cc4b529b189f")
|
||||
assert.ErrorIs(t, err, ErrWorkerTagNotFound)
|
||||
assert.Nil(t, fetchedTag)
|
||||
|
||||
// New tag creation is already done in the workerTestFixtures() call.
|
||||
assert.NotNil(t, f.tag)
|
||||
|
||||
fetchedTag, err = f.db.FetchWorkerTag(f.ctx, f.tag.UUID)
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, fetchedTag)
|
||||
|
||||
// Test contents of fetched tag.
|
||||
assert.Equal(t, f.tag.UUID, fetchedTag.UUID)
|
||||
assert.Equal(t, f.tag.Name, fetchedTag.Name)
|
||||
assert.Equal(t, f.tag.Description, fetchedTag.Description)
|
||||
assert.Zero(t, fetchedTag.Workers)
|
||||
}
|
||||
|
||||
func TestFetchDeleteTags(t *testing.T) {
|
||||
f := workerTestFixtures(t, 1*time.Second)
|
||||
defer f.done()
|
||||
|
||||
// Single tag was created by fixture.
|
||||
has, err := f.db.HasWorkerTags(f.ctx)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, has, "expecting HasWorkerTags to return true")
|
||||
|
||||
secondTag := WorkerTag{
|
||||
UUID: uuid.New(),
|
||||
Name: "arbeiderstag",
|
||||
Description: "Worker tag in Dutch",
|
||||
}
|
||||
|
||||
require.NoError(t, f.db.CreateWorkerTag(f.ctx, &secondTag))
|
||||
|
||||
allTags, err := f.db.FetchWorkerTags(f.ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, allTags, 2)
|
||||
var allTagIDs [2]string
|
||||
for idx := range allTags {
|
||||
allTagIDs[idx] = allTags[idx].UUID
|
||||
}
|
||||
assert.Contains(t, allTagIDs, f.tag.UUID)
|
||||
assert.Contains(t, allTagIDs, secondTag.UUID)
|
||||
|
||||
has, err = f.db.HasWorkerTags(f.ctx)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, has, "expecting HasWorkerTags to return true")
|
||||
|
||||
// Test deleting the 2nd tag.
|
||||
require.NoError(t, f.db.DeleteWorkerTag(f.ctx, secondTag.UUID))
|
||||
|
||||
allTags, err = f.db.FetchWorkerTags(f.ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, allTags, 1)
|
||||
assert.Equal(t, f.tag.UUID, allTags[0].UUID)
|
||||
|
||||
// Test deleting the 1st tag.
|
||||
require.NoError(t, f.db.DeleteWorkerTag(f.ctx, f.tag.UUID))
|
||||
has, err = f.db.HasWorkerTags(f.ctx)
|
||||
require.NoError(t, err)
|
||||
assert.False(t, has, "expecting HasWorkerTags to return false")
|
||||
}
|
||||
|
||||
func TestAssignUnassignWorkerTags(t *testing.T) {
|
||||
f := workerTestFixtures(t, 1*time.Second)
|
||||
defer f.done()
|
||||
|
||||
assertTags := func(msgLabel string, tagUUIDs ...string) {
|
||||
w, err := f.db.FetchWorker(f.ctx, f.worker.UUID)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Catch doubly-reported tags, as the maps below would hide those cases.
|
||||
assert.Len(t, w.Tags, len(tagUUIDs), msgLabel)
|
||||
|
||||
expectTags := make(map[string]bool)
|
||||
for _, cid := range tagUUIDs {
|
||||
expectTags[cid] = true
|
||||
}
|
||||
|
||||
actualTags := make(map[string]bool)
|
||||
for _, c := range w.Tags {
|
||||
actualTags[c.UUID] = true
|
||||
}
|
||||
|
||||
assert.Equal(t, expectTags, actualTags, msgLabel)
|
||||
}
|
||||
|
||||
secondTag := WorkerTag{
|
||||
UUID: uuid.New(),
|
||||
Name: "arbeiderstag",
|
||||
Description: "Worker tag in Dutch",
|
||||
}
|
||||
|
||||
require.NoError(t, f.db.CreateWorkerTag(f.ctx, &secondTag))
|
||||
|
||||
// By default the Worker should not be part of a tag.
|
||||
assertTags("default tag assignment")
|
||||
|
||||
require.NoError(t, f.db.WorkerSetTags(f.ctx, f.worker, []string{f.tag.UUID}))
|
||||
assertTags("setting one tag", f.tag.UUID)
|
||||
|
||||
// Double assignments should also just work.
|
||||
require.NoError(t, f.db.WorkerSetTags(f.ctx, f.worker, []string{f.tag.UUID, f.tag.UUID}))
|
||||
assertTags("setting twice the same tag", f.tag.UUID)
|
||||
|
||||
// Multiple tag memberships.
|
||||
require.NoError(t, f.db.WorkerSetTags(f.ctx, f.worker, []string{f.tag.UUID, secondTag.UUID}))
|
||||
assertTags("setting two different tags", f.tag.UUID, secondTag.UUID)
|
||||
|
||||
// Remove memberships.
|
||||
require.NoError(t, f.db.WorkerSetTags(f.ctx, f.worker, []string{secondTag.UUID}))
|
||||
assertTags("unassigning from first tag", secondTag.UUID)
|
||||
require.NoError(t, f.db.WorkerSetTags(f.ctx, f.worker, []string{}))
|
||||
assertTags("unassigning from second tag")
|
||||
}
|
||||
|
||||
func TestSaveWorkerTag(t *testing.T) {
|
||||
f := workerTestFixtures(t, 1*time.Second)
|
||||
defer f.done()
|
||||
|
||||
f.tag.Name = "übertag"
|
||||
f.tag.Description = "ʻO kēlā hui ma laila"
|
||||
require.NoError(t, f.db.SaveWorkerTag(f.ctx, f.tag))
|
||||
|
||||
fetched, err := f.db.FetchWorkerTag(f.ctx, f.tag.UUID)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, f.tag.Name, fetched.Name)
|
||||
assert.Equal(t, f.tag.Description, fetched.Description)
|
||||
}
|
||||
|
||||
func TestDeleteWorkerTagWithWorkersAssigned(t *testing.T) {
|
||||
f := workerTestFixtures(t, 1*time.Second)
|
||||
defer f.done()
|
||||
|
||||
// Assign the worker.
|
||||
require.NoError(t, f.db.WorkerSetTags(f.ctx, f.worker, []string{f.tag.UUID}))
|
||||
|
||||
// Delete the tag.
|
||||
require.NoError(t, f.db.DeleteWorkerTag(f.ctx, f.tag.UUID))
|
||||
|
||||
// Check the Worker has been unassigned from the tag.
|
||||
w, err := f.db.FetchWorker(f.ctx, f.worker.UUID)
|
||||
require.NoError(t, err)
|
||||
assert.Empty(t, w.Tags)
|
||||
}
|
@ -31,7 +31,7 @@ type Worker struct {
|
||||
|
||||
SupportedTaskTypes string `gorm:"type:varchar(255);default:''"` // comma-separated list of task types.
|
||||
|
||||
Clusters []*WorkerCluster `gorm:"many2many:worker_cluster_membership;constraint:OnDelete:CASCADE"`
|
||||
Tags []*WorkerTag `gorm:"many2many:worker_tag_membership;constraint:OnDelete:CASCADE"`
|
||||
}
|
||||
|
||||
func (w *Worker) Identifier() string {
|
||||
@ -73,7 +73,7 @@ func (db *DB) CreateWorker(ctx context.Context, w *Worker) error {
|
||||
func (db *DB) FetchWorker(ctx context.Context, uuid string) (*Worker, error) {
|
||||
w := Worker{}
|
||||
tx := db.gormDB.WithContext(ctx).
|
||||
Preload("Clusters").
|
||||
Preload("Tags").
|
||||
First(&w, "uuid = ?", uuid)
|
||||
if tx.Error != nil {
|
||||
return nil, workerError(tx.Error, "fetching worker")
|
||||
|
@ -319,18 +319,18 @@ func TestDeleteWorker(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeleteWorkerWithClusterAssigned(t *testing.T) {
|
||||
func TestDeleteWorkerWithTagAssigned(t *testing.T) {
|
||||
f := workerTestFixtures(t, 1*time.Second)
|
||||
defer f.done()
|
||||
|
||||
// Assign the worker.
|
||||
require.NoError(t, f.db.WorkerSetClusters(f.ctx, f.worker, []string{f.cluster.UUID}))
|
||||
require.NoError(t, f.db.WorkerSetTags(f.ctx, f.worker, []string{f.tag.UUID}))
|
||||
|
||||
// Delete the Worker.
|
||||
require.NoError(t, f.db.DeleteWorker(f.ctx, f.worker.UUID))
|
||||
|
||||
// Check the Worker has been unassigned from the cluster.
|
||||
cluster, err := f.db.FetchWorkerCluster(f.ctx, f.cluster.UUID)
|
||||
// Check the Worker has been unassigned from the tag.
|
||||
tag, err := f.db.FetchWorkerTag(f.ctx, f.tag.UUID)
|
||||
require.NoError(t, err)
|
||||
assert.Empty(t, cluster.Workers)
|
||||
assert.Empty(t, tag.Workers)
|
||||
}
|
||||
|
@ -32,7 +32,7 @@ func NewWorkerUpdate(worker *persistence.Worker) api.SocketIOWorkerUpdate {
|
||||
workerUpdate.LastSeen = &worker.LastSeenAt
|
||||
}
|
||||
|
||||
// TODO: add cluster IDs.
|
||||
// TODO: add tag IDs.
|
||||
|
||||
return workerUpdate
|
||||
}
|
||||
|
@ -215,7 +215,6 @@ func fileCopy(src, dest string) (error, string) {
|
||||
return nil, msg
|
||||
}
|
||||
|
||||
|
||||
func fileExists(filename string) bool {
|
||||
_, err := os.Stat(filename)
|
||||
return !errors.Is(err, fs.ErrNotExist)
|
||||
|
@ -345,7 +345,6 @@ func TestCmdCopyFileDestinationExists(t *testing.T) {
|
||||
assert.Error(t, f.run())
|
||||
}
|
||||
|
||||
|
||||
func TestCmdCopyFileSourceIsDir(t *testing.T) {
|
||||
f := newCmdCopyFileFixture(t)
|
||||
defer f.finish(t)
|
||||
@ -372,7 +371,6 @@ func TestCmdCopyFileSourceIsDir(t *testing.T) {
|
||||
assert.Error(t, f.run())
|
||||
}
|
||||
|
||||
|
||||
func newCmdCopyFileFixture(t *testing.T) cmdCopyFileFixture {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
ce, mocks := testCommandExecutor(t, mockCtrl)
|
||||
|
@ -32,12 +32,17 @@
|
||||
<dt class="field-name" title="ID">ID</dt>
|
||||
<dd><span @click="copyElementText" class="click-to-copy">{{ jobData.id }}</span></dd>
|
||||
|
||||
<template v-if="workerCluster">
|
||||
<!-- TODO: fetch cluster name and show that instead, and allow editing of the cluster. -->
|
||||
<dt class="field-name" title="Worker Cluster">Cluster</dt>
|
||||
<dd :title="workerCluster.description"><span @click="copyElementData" class="click-to-copy"
|
||||
:data-clipboard="workerCluster.id">{{
|
||||
workerCluster.name }}</span></dd>
|
||||
<template v-if="workerTag">
|
||||
<!-- TODO: fetch tag name and show that instead, and allow editing of the tag. -->
|
||||
<dt class="field-name" title="Worker Tag">Tag</dt>
|
||||
<dd :title="workerTag.description">
|
||||
<span
|
||||
@click="copyElementData"
|
||||
class="click-to-copy"
|
||||
:data-clipboard="workerTag.id"
|
||||
>{{ workerTag.name }}</span
|
||||
>
|
||||
</dd>
|
||||
</template>
|
||||
|
||||
<dt class="field-name" title="Name">Name</dt>
|
||||
@ -128,8 +133,7 @@ export default {
|
||||
this._refreshJobSettings(this.jobData);
|
||||
}
|
||||
|
||||
this.workers.refreshClusters()
|
||||
.catch((error) => {
|
||||
this.workers.refreshTags().catch((error) => {
|
||||
const errorMsg = JSON.stringify(error); // TODO: handle API errors better.
|
||||
this.notifs.add(`Error: ${errorMsg}`);
|
||||
});
|
||||
@ -156,9 +160,9 @@ export default {
|
||||
}
|
||||
return this.jobData.settings;
|
||||
},
|
||||
workerCluster() {
|
||||
if (!this.jobData.worker_cluster) return undefined;
|
||||
return this.workers.clustersByID[this.jobData.worker_cluster];
|
||||
workerTag() {
|
||||
if (!this.jobData.worker_tag) return undefined;
|
||||
return this.workers.tagsByID[this.jobData.worker_tag];
|
||||
},
|
||||
},
|
||||
watch: {
|
||||
|
@ -34,21 +34,24 @@
|
||||
</dd>
|
||||
</dl>
|
||||
|
||||
<section class="worker-clusters" v-if="workers.clusters && workers.clusters.length">
|
||||
<h3 class="sub-title">Clusters</h3>
|
||||
<section class="worker-tags" v-if="workers.tags && workers.tags.length">
|
||||
<h3 class="sub-title">Tags</h3>
|
||||
<ul>
|
||||
<li v-for="cluster in workers.clusters">
|
||||
<switch-checkbox :isChecked="thisWorkerClusters[cluster.id]" :label="cluster.name" :title="cluster.description"
|
||||
@switch-toggle="toggleWorkerCluster(cluster.id)">
|
||||
<li v-for="tag in workers.tags">
|
||||
<switch-checkbox
|
||||
:isChecked="thisWorkerTags[tag.id]"
|
||||
:label="tag.name"
|
||||
:title="tag.description"
|
||||
@switch-toggle="toggleWorkerTag(tag.id)"
|
||||
>
|
||||
</switch-checkbox>
|
||||
</li>
|
||||
</ul>
|
||||
<p class="hint" v-if="hasClustersAssigned">
|
||||
This worker will only pick up jobs assigned to one of its clusters, and clusterless jobs.
|
||||
</p>
|
||||
<p class="hint" v-else>
|
||||
This worker will only pick up clusterless jobs.
|
||||
<p class="hint" v-if="hasTagsAssigned">
|
||||
This worker will only pick up jobs assigned to one of its tags, and
|
||||
tagless jobs.
|
||||
</p>
|
||||
<p class="hint" v-else>This worker will only pick up tagless jobs.</p>
|
||||
</section>
|
||||
|
||||
<section class="sleep-schedule" :class="{ 'is-schedule-active': workerSleepSchedule.is_active }">
|
||||
@ -165,15 +168,14 @@ export default {
|
||||
notifs: useNotifs(),
|
||||
copyElementText: copyElementText,
|
||||
workers: useWorkers(),
|
||||
thisWorkerClusters: {}, // Mapping from UUID to 'isAssigned' boolean.
|
||||
thisWorkerTags: {}, // Mapping from UUID to 'isAssigned' boolean.
|
||||
};
|
||||
},
|
||||
mounted() {
|
||||
// Allow testing from the JS console:
|
||||
window.workerDetailsVue = this;
|
||||
|
||||
this.workers.refreshClusters()
|
||||
.catch((error) => {
|
||||
this.workers.refreshTags().catch((error) => {
|
||||
const errorMsg = JSON.stringify(error); // TODO: handle API errors better.
|
||||
this.notifs.add(`Error: ${errorMsg}`);
|
||||
});
|
||||
@ -191,7 +193,7 @@ export default {
|
||||
this.fetchWorkerSleepSchedule();
|
||||
}
|
||||
|
||||
this.updateThisWorkerClusters(newData);
|
||||
this.updateThisWorkerTags(newData);
|
||||
},
|
||||
},
|
||||
computed: {
|
||||
@ -209,11 +211,10 @@ export default {
|
||||
},
|
||||
workerSleepScheduleStatusLabel() {
|
||||
return this.workerSleepSchedule.is_active ? 'Enabled' : 'Disabled';
|
||||
hasTagsAssigned() {
|
||||
const tagIDs = this.getAssignedTagIDs();
|
||||
return tagIDs && tagIDs.length > 0;
|
||||
},
|
||||
hasClustersAssigned() {
|
||||
const clusterIDs = this.getAssignedClusterIDs();
|
||||
return clusterIDs && clusterIDs.length > 0;
|
||||
}
|
||||
},
|
||||
methods: {
|
||||
fetchWorkerSleepSchedule() {
|
||||
@ -262,46 +263,48 @@ export default {
|
||||
}
|
||||
this.api.deleteWorker(this.workerData.id);
|
||||
},
|
||||
updateThisWorkerClusters(newWorkerData) {
|
||||
if (!newWorkerData || !newWorkerData.clusters) {
|
||||
this.thisWorkerClusters = {};
|
||||
updateThisWorkerTags(newWorkerData) {
|
||||
if (!newWorkerData || !newWorkerData.tags) {
|
||||
this.thisWorkerTags = {};
|
||||
return;
|
||||
}
|
||||
|
||||
const assignedClusters = newWorkerData.clusters.reduce(
|
||||
(accu, cluster) => { accu[cluster.id] = true; return accu; },
|
||||
{});
|
||||
this.thisWorkerClusters = assignedClusters;
|
||||
const assignedTags = newWorkerData.tags.reduce((accu, tag) => {
|
||||
accu[tag.id] = true;
|
||||
return accu;
|
||||
}, {});
|
||||
this.thisWorkerTags = assignedTags;
|
||||
},
|
||||
toggleWorkerCluster(clusterID) {
|
||||
console.log("Toggled", clusterID);
|
||||
this.thisWorkerClusters[clusterID] = !this.thisWorkerClusters[clusterID];
|
||||
console.log("New assignment:", plain(this.thisWorkerClusters))
|
||||
toggleWorkerTag(tagID) {
|
||||
console.log("Toggled", tagID);
|
||||
this.thisWorkerTags[tagID] = !this.thisWorkerTags[tagID];
|
||||
console.log("New assignment:", plain(this.thisWorkerTags));
|
||||
|
||||
// Construct cluster change request.
|
||||
const clusterIDs = this.getAssignedClusterIDs();
|
||||
const changeRequest = new WorkerClusterChangeRequest(clusterIDs);
|
||||
// Construct tag change request.
|
||||
const tagIDs = this.getAssignedTagIDs();
|
||||
const changeRequest = new WorkerTagChangeRequest(tagIDs);
|
||||
|
||||
// Send to the Manager.
|
||||
this.api.setWorkerClusters(this.workerData.id, changeRequest)
|
||||
this.api
|
||||
.setWorkerTags(this.workerData.id, changeRequest)
|
||||
.then(() => {
|
||||
this.notifs.add('Cluster assignment updated');
|
||||
this.notifs.add("Tag assignment updated");
|
||||
})
|
||||
.catch((error) => {
|
||||
const errorMsg = JSON.stringify(error); // TODO: handle API errors better.
|
||||
this.notifs.add(`Error: ${errorMsg}`);
|
||||
});
|
||||
},
|
||||
getAssignedClusterIDs() {
|
||||
const clusterIDs = [];
|
||||
for (let clusterID in this.thisWorkerClusters) {
|
||||
getAssignedTagIDs() {
|
||||
const tagIDs = [];
|
||||
for (let tagID in this.thisWorkerTags) {
|
||||
// Values can exist and be set to 'false'.
|
||||
const isAssigned = this.thisWorkerClusters[clusterID];
|
||||
if (isAssigned) clusterIDs.push(clusterID);
|
||||
}
|
||||
return clusterIDs;
|
||||
}
|
||||
const isAssigned = this.thisWorkerTags[tagID];
|
||||
if (isAssigned) tagIDs.push(tagID);
|
||||
}
|
||||
return tagIDs;
|
||||
},
|
||||
},
|
||||
};
|
||||
</script>
|
||||
|
||||
@ -377,11 +380,11 @@ export default {
|
||||
white-space: nowrap;
|
||||
}
|
||||
|
||||
.worker-clusters ul {
|
||||
.worker-tags ul {
|
||||
list-style: none;
|
||||
}
|
||||
|
||||
.worker-clusters ul li {
|
||||
.worker-tags ul li {
|
||||
margin-bottom: 0.25rem;
|
||||
}
|
||||
</style>
|
||||
|
@ -15,11 +15,11 @@ export const useWorkers = defineStore('workers', {
|
||||
*/
|
||||
activeWorkerID: "",
|
||||
|
||||
/** @type {API.WorkerCluster[]} */
|
||||
clusters: [],
|
||||
/** @type {API.WorkerTag[]} */
|
||||
tags: [],
|
||||
|
||||
/* Mapping from cluster UUID to API.WorkerCluster. */
|
||||
clustersByID: {},
|
||||
/* Mapping from tag UUID to API.WorkerTag. */
|
||||
tagsByID: {},
|
||||
}),
|
||||
actions: {
|
||||
setActiveWorkerID(workerID) {
|
||||
@ -47,22 +47,21 @@ export const useWorkers = defineStore('workers', {
|
||||
});
|
||||
},
|
||||
/**
|
||||
* Fetch the available worker clusters from the Manager.
|
||||
* Fetch the available worker tags from the Manager.
|
||||
*
|
||||
* @returns a promise.
|
||||
*/
|
||||
refreshClusters() {
|
||||
refreshTags() {
|
||||
const api = new WorkerMgtApi(getAPIClient());
|
||||
return api.fetchWorkerClusters()
|
||||
.then((resp) => {
|
||||
this.clusters = resp.clusters;
|
||||
return api.fetchWorkerTags().then((resp) => {
|
||||
this.tags = resp.tags;
|
||||
|
||||
let clustersByID = {};
|
||||
for (let cluster of this.clusters) {
|
||||
clustersByID[cluster.id] = cluster;
|
||||
let tagsByID = {};
|
||||
for (let tag of this.tags) {
|
||||
tagsByID[tag.id] = tag;
|
||||
}
|
||||
this.clustersByID = clustersByID;
|
||||
})
|
||||
this.tagsByID = tagsByID;
|
||||
});
|
||||
},
|
||||
},
|
||||
})
|
||||
});
|
||||
|
Loading…
Reference in New Issue
Block a user