From cfe561c79e755d0e58e9a79ef707fca35701dbee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Tue, 18 Dec 2018 10:51:55 +0100 Subject: [PATCH] Fix T58779: allow lazy status change requests Status changes can now be marked as 'lazy', in which case they are only applied when the worker has finished its current task. This only required changes to the 'may-I-run' endpoint; it now ignores lazy requests. --- flamenco/dashboard.go | 15 +++++++++++--- flamenco/documents.go | 9 ++++++--- flamenco/scheduler.go | 9 +++++++-- flamenco/scheduler_test.go | 31 +++++++++++++++++++++++++++++ flamenco/sleep_scheduler.go | 2 +- flamenco/workers.go | 23 ++++++++++++++++++---- flamenco/workers_test.go | 18 ++++++++--------- static/dashboard.css | 6 ++++++ static/dashboard.js | 39 +++++++++++++++++++++++++++---------- static/vue-components.html | 6 +++++- 10 files changed, 125 insertions(+), 33 deletions(-) diff --git a/flamenco/dashboard.go b/flamenco/dashboard.go index 3790aa17..1380ff48 100644 --- a/flamenco/dashboard.go +++ b/flamenco/dashboard.go @@ -159,6 +159,7 @@ func (dash *Dashboard) sendStatusReport(w http.ResponseWriter, r *http.Request) "software": 1, "status": 1, "status_requested": 1, + "lazy_status_request": 1, "supported_task_types": 1, "sleep_schedule": 1, "blacklist": 1, @@ -241,11 +242,19 @@ func (dash *Dashboard) workerAction(w http.ResponseWriter, r *http.Request) { actionHandlers := map[string]func(){ "set-status": func() { requestedStatus := r.FormValue("status") - logger = logger.WithField("requested_status", requestedStatus) - actionErr = worker.RequestStatusChange(requestedStatus, db) + lazy := Lazyness(r.FormValue("lazy") == "true") + logger = logger.WithFields(log.Fields{ + "requested_status": requestedStatus, + "lazy": lazy, + }) + actionErr = worker.RequestStatusChange(requestedStatus, lazy, db) }, "shutdown": func() { - actionErr = worker.RequestStatusChange(workerStatusShutdown, db) + lazy := Lazyness(r.FormValue("lazy") == "true") + logger = logger.WithFields(log.Fields{ + "lazy": lazy, + }) + actionErr = worker.RequestStatusChange(workerStatusShutdown, lazy, db) }, "ack-timeout": func() { actionErr = worker.AckTimeout(db) diff --git a/flamenco/documents.go b/flamenco/documents.go index 96da169b..db27ea6e 100644 --- a/flamenco/documents.go +++ b/flamenco/documents.go @@ -139,9 +139,12 @@ type Worker struct { CurrentJob bson.ObjectId `bson:"current_job,omitempty" json:"current_job,omitempty"` // For controlling sleeping & waking up. For values, see the workerStatusXXX constants. - StatusRequested string `bson:"status_requested" json:"status_requested"` - SleepSchedule ScheduleInfo `bson:"sleep_schedule,omitempty" json:"sleep_schedule"` - Blacklist []WorkerBlacklistEntry `json:"blacklist,omitempty"` + StatusRequested string `bson:"status_requested" json:"status_requested"` + LazyStatusRequest Lazyness `bson:"lazy_status_request" json:"lazy_status_request"` // Only apply requested status when current task is finished. + SleepSchedule ScheduleInfo `bson:"sleep_schedule,omitempty" json:"sleep_schedule"` + + // For preventing a failing worker from eating up all tasks of a certain job. + Blacklist []WorkerBlacklistEntry `json:"blacklist,omitempty"` } // ScheduleInfo for automatically sending a Worker to sleep & waking up. diff --git a/flamenco/scheduler.go b/flamenco/scheduler.go index f13ce1dd..945c2bb6 100644 --- a/flamenco/scheduler.go +++ b/flamenco/scheduler.go @@ -428,10 +428,15 @@ func (ts *TaskScheduler) WorkerMayRunTask(w http.ResponseWriter, r *auth.Authent db *mgo.Database, taskID bson.ObjectId) { worker, logFields := findWorkerForHTTP(w, r, db) + statusRequested := "" logFields["task_id"] = taskID.Hex() + if worker.StatusRequested != "" && worker.LazyStatusRequest == Immediate { + statusRequested = worker.StatusRequested + } if worker.StatusRequested != "" { logFields["worker_status_requested"] = worker.StatusRequested + logFields["lazy_status_request"] = worker.LazyStatusRequest } worker.SetAwake(db) worker.Seen(&r.Request, db) @@ -459,9 +464,9 @@ func (ts *TaskScheduler) WorkerMayRunTask(w http.ResponseWriter, r *auth.Authent logFields["task_status"] = task.Status log.WithFields(logFields).Warning("WorkerMayRunTask: task is in not-runnable status, worker will stop") response.Reason = fmt.Sprintf("task %s in non-runnable status %s", taskID.Hex(), task.Status) - } else if !workerStatusRunnable[worker.StatusRequested] { + } else if !workerStatusRunnable[statusRequested] { log.WithFields(logFields).Warning("WorkerMayRunTask: worker was requested to go to non-active status; will stop its current task") - response.Reason = fmt.Sprintf("worker status change to %s requested", worker.StatusRequested) + response.Reason = fmt.Sprintf("worker status change to %s requested", statusRequested) } else { response.MayKeepRunning = true WorkerPingedTask(worker.ID, taskID, "", db) diff --git a/flamenco/scheduler_test.go b/flamenco/scheduler_test.go index 22384d0f..c2898cc8 100644 --- a/flamenco/scheduler_test.go +++ b/flamenco/scheduler_test.go @@ -562,6 +562,37 @@ func (s *SchedulerTestSuite) TestWorkerMayRun(t *check.C) { assert.Equal(t, false, resp.MayKeepRunning) } +func (s *SchedulerTestSuite) TestWorkerMayRunStatusRequested(t *check.C) { + // Store task in DB. + task := ConstructTestTask("aaaaaaaaaaaaaaaaaaaaaaaa", "sleeping") + if err := s.db.C("flamenco_tasks").Insert(task); err != nil { + t.Fatal("Unable to insert test task", err) + } + + // Make sure the scheduler gives us this task. + respRec, ar := WorkerTestRequest(s.workerLnx.ID, "GET", "/task") + s.sched.ScheduleTask(respRec, ar) + + // When a lazy status change is requested, we should be allowed to keep running. + assert.Nil(t, s.workerLnx.RequestStatusChange(workerStatusAsleep, Lazy, s.db)) + respRec, ar = WorkerTestRequest(s.workerLnx.ID, "GET", "/may-i-run/%s", task.ID.Hex()) + s.sched.WorkerMayRunTask(respRec, ar, s.db, task.ID) + + resp := MayKeepRunningResponse{} + parseJSON(t, respRec, 200, &resp) + assert.Equal(t, "", resp.Reason) + assert.Equal(t, true, resp.MayKeepRunning) + + // When an immediate status change is requested, we shouldn't be allowed to keep running. + assert.Nil(t, s.workerLnx.RequestStatusChange(workerStatusAsleep, Immediate, s.db)) + respRec, ar = WorkerTestRequest(s.workerLnx.ID, "GET", "/may-i-run/%s", task.ID.Hex()) + s.sched.WorkerMayRunTask(respRec, ar, s.db, task.ID) + + parseJSON(t, respRec, 200, &resp) + assert.Equal(t, "worker status change to asleep requested", resp.Reason) + assert.Equal(t, false, resp.MayKeepRunning) +} + func (s *SchedulerTestSuite) TestBlacklist(c *check.C) { // Insert a number of tasks of different type & job. job1 := bson.NewObjectId() diff --git a/flamenco/sleep_scheduler.go b/flamenco/sleep_scheduler.go index 1596a54e..7f42eb52 100644 --- a/flamenco/sleep_scheduler.go +++ b/flamenco/sleep_scheduler.go @@ -102,7 +102,7 @@ func (ss *SleepScheduler) RequestWorkerStatus(worker *Worker, db *mgo.Database) } else { logger.Info("requesting worker to switch to scheduled status") } - if err := worker.RequestStatusChange(scheduled, db); err != nil { + if err := worker.RequestStatusChange(scheduled, Immediate, db); err != nil { logger.WithError(err).Error("unable to store status change in database") } } diff --git a/flamenco/workers.go b/flamenco/workers.go index 2484b15d..dcdc14f2 100644 --- a/flamenco/workers.go +++ b/flamenco/workers.go @@ -30,6 +30,16 @@ var workerStatusRunnable = map[string]bool{ "": true, // no status change was requested } +// Lazyness indicates whether a worker's requested status change is lazy (true) or immediate (false). +type Lazyness bool + +const ( + // Immediate status change requests interrupt the currently running task. + Immediate Lazyness = false + // Lazy status change requests are applied when the currently running task finishes. + Lazy Lazyness = true +) + // Identifier returns the worker's address, with the nickname in parentheses (if set). // // Make sure that you include the nickname in the projection when you fetch @@ -72,14 +82,19 @@ func (worker *Worker) SetCurrentTask(taskID bson.ObjectId, db *mgo.Database) err } // RequestStatusChange stores the new requested status in MongoDB, so that it gets picked up -// by the worker the next time it asks for it. -func (worker *Worker) RequestStatusChange(newStatus string, db *mgo.Database) error { +// by the worker the next time it asks for it. Parameter 'lazy' indicates that the worker can +// finish the current task first, before applying the status change. +func (worker *Worker) RequestStatusChange(newStatus string, lazy Lazyness, db *mgo.Database) error { if !workerStatusRequestable[newStatus] { return fmt.Errorf("RequestStatusChange(%q): status cannot be requested", newStatus) } worker.StatusRequested = newStatus - updates := M{"status_requested": newStatus} + worker.LazyStatusRequest = lazy + updates := M{ + "status_requested": newStatus, + "lazy_status_request": lazy, + } return db.C("flamenco_workers").UpdateId(worker.ID, M{"$set": updates}) } @@ -426,7 +441,7 @@ func WorkerSignOff(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo. // Update the worker itself, to show it's down in the DB too. if worker.Status == workerStatusAsleep && (worker.StatusRequested == "" || worker.StatusRequested == workerStatusShutdown) { // Make sure that the worker remains asleep, even after signing on again. - defer worker.RequestStatusChange(workerStatusAsleep, db) + defer worker.RequestStatusChange(workerStatusAsleep, Immediate, db) } // Signing off is seen as acknowledgement of the "shutdown" status. diff --git a/flamenco/workers_test.go b/flamenco/workers_test.go index f699e265..c4820cba 100644 --- a/flamenco/workers_test.go +++ b/flamenco/workers_test.go @@ -222,7 +222,7 @@ func (s *WorkerTestSuite) TestWorkerSignOff(t *check.C) { // Signing off when awake and shutdown requested s.workerLnx.SetStatus(workerStatusAwake, s.db) - s.workerLnx.RequestStatusChange(workerStatusShutdown, s.db) + s.workerLnx.RequestStatusChange(workerStatusShutdown, Immediate, s.db) signoff() getworker() assert.Equal(t, workerStatusOffline, found.Status) @@ -230,7 +230,7 @@ func (s *WorkerTestSuite) TestWorkerSignOff(t *check.C) { // Signing off when asleep s.workerLnx.SetStatus(workerStatusAsleep, s.db) - s.workerLnx.RequestStatusChange(workerStatusShutdown, s.db) + s.workerLnx.RequestStatusChange(workerStatusShutdown, Immediate, s.db) signoff() getworker() assert.Equal(t, workerStatusOffline, found.Status) @@ -238,7 +238,7 @@ func (s *WorkerTestSuite) TestWorkerSignOff(t *check.C) { // Signing off when asleep and awake requested s.workerLnx.SetStatus(workerStatusAsleep, s.db) - s.workerLnx.RequestStatusChange(workerStatusAwake, s.db) + s.workerLnx.RequestStatusChange(workerStatusAwake, Immediate, s.db) signoff() getworker() assert.Equal(t, workerStatusOffline, found.Status) @@ -246,7 +246,7 @@ func (s *WorkerTestSuite) TestWorkerSignOff(t *check.C) { // Signing off when timed out s.workerLnx.SetStatus(workerStatusTimeout, s.db) - s.workerLnx.RequestStatusChange(workerStatusShutdown, s.db) + s.workerLnx.RequestStatusChange(workerStatusShutdown, Immediate, s.db) signoff() getworker() assert.Equal(t, workerStatusOffline, found.Status) @@ -256,7 +256,7 @@ func (s *WorkerTestSuite) TestWorkerSignOff(t *check.C) { // Tests receiving the status change via /may-i-run and /task func (s *WorkerTestSuite) TestStatusChangeReceiving(t *check.C) { // Requesting a new status should set it both on the instance and on the database. - err := s.workerLnx.RequestStatusChange(workerStatusAsleep, s.db) + err := s.workerLnx.RequestStatusChange(workerStatusAsleep, Immediate, s.db) assert.Nil(t, err) assert.Equal(t, workerStatusAsleep, s.workerLnx.StatusRequested) assert.Equal(t, workerStatusAwake, s.workerLnx.Status) @@ -300,7 +300,7 @@ func (s *WorkerTestSuite) TestWorkerStatusChange(t *check.C) { assert.Equal(t, http.StatusNoContent, respRec.Code) // Request a status change. - err := s.workerLnx.RequestStatusChange(workerStatusAsleep, s.db) + err := s.workerLnx.RequestStatusChange(workerStatusAsleep, Immediate, s.db) assert.Nil(t, err) // Now we should get the change back. @@ -313,7 +313,7 @@ func (s *WorkerTestSuite) TestWorkerStatusChange(t *check.C) { } func (s *WorkerTestSuite) TestAckStatusChange(t *check.C) { - err := s.workerLnx.RequestStatusChange(workerStatusAsleep, s.db) + err := s.workerLnx.RequestStatusChange(workerStatusAsleep, Immediate, s.db) assert.Nil(t, err) // Tests direct function call. @@ -331,7 +331,7 @@ func (s *WorkerTestSuite) TestAckStatusChange(t *check.C) { } func (s *WorkerTestSuite) TestAckStatusChangeHTTP(t *check.C) { - err := s.workerLnx.RequestStatusChange(workerStatusAsleep, s.db) + err := s.workerLnx.RequestStatusChange(workerStatusAsleep, Immediate, s.db) assert.Nil(t, err) // Tests requested status change. @@ -377,7 +377,7 @@ func (s *WorkerTestSuite) TestTimeout(t *check.C) { func (s *WorkerTestSuite) TestStatusChangeNotRequestable(t *check.C) { teststatus := func(status string) { - err := s.workerLnx.RequestStatusChange(status, s.db) + err := s.workerLnx.RequestStatusChange(status, Immediate, s.db) assert.NotNil(t, err) assert.Equal(t, "", s.workerLnx.StatusRequested) assert.Equal(t, workerStatusAwake, s.workerLnx.Status) diff --git a/static/dashboard.css b/static/dashboard.css index 36954a22..d4d35a94 100644 --- a/static/dashboard.css +++ b/static/dashboard.css @@ -116,6 +116,12 @@ button#downloadkick { border-bottom: none; } +.worker-action:hover { + text-shadow: + 0 0 1px black, + 0 0 3px fuchsia; +} + .worker-action.forget-worker { font-size: 85%; vertical-align: super; diff --git a/static/dashboard.js b/static/dashboard.js index 8abf8593..70393d33 100644 --- a/static/dashboard.js +++ b/static/dashboard.js @@ -12,19 +12,36 @@ var load_workers_timeout_handle; * the action should not be available to workers already in that status. */ WORKER_ACTIONS = Object.freeze({ - offline: { - label: '✝ Shut Down', + offline_lazy: { + label: '✝ Shut Down (after task is finished)', icon: '✝', - title: 'The worker may automatically restart.', - payload: { action: 'shutdown' }, + title: 'Shut down the worker after the current task finishes. The worker may automatically restart.', + payload: { action: 'shutdown', lazy: true }, + available(worker_status) { return worker_status != 'offline'; }, }, - asleep: { - label: '😴 Send to Sleep', + offline_immediate: { + label: '✝! Shut Down (immediately)', + icon: '✝!', + title: 'Immediately shut down the worker. It may automatically restart.', + payload: { action: 'shutdown', lazy: false }, + available(worker_status) { return false }, + }, + asleep_lazy: { + label: '😴 Send to Sleep (after task is finished)', icon: '😴', - title: 'Let the worker sleep', - payload: { action: 'set-status', status: 'asleep' }, + title: 'Let the worker sleep after finishing this task.', + payload: { action: 'set-status', status: 'asleep', lazy: true }, available(worker_status) { - return worker_status != 'timeout'; + return worker_status != 'timeout' && worker_status != 'asleep'; + }, + }, + asleep_immediate: { + label: '😴! Send to Sleep (immediately)', + icon: '😴!', + title: 'Let the worker sleep immediately.', + payload: { action: 'set-status', status: 'asleep', lazy: false }, + available(worker_status) { + return worker_status != 'timeout' && worker_status != 'asleep'; }, }, wakeup: { @@ -32,7 +49,9 @@ WORKER_ACTIONS = Object.freeze({ icon: '😃', title: 'Wake the worker up. A sleeping worker can take a minute to respond.', payload: { action: 'set-status', status: 'awake' }, - available(worker_status, requested_status) { return worker_status == 'asleep' || requested_status == 'asleep'; }, + available(worker_status, requested_status) { + return worker_status == 'asleep' || requested_status == 'asleep'; + }, }, ack_timeout: { label: '✓ Acknowledge Timeout', diff --git a/static/vue-components.html b/static/vue-components.html index a887db71..c903d0f3 100644 --- a/static/vue-components.html +++ b/static/vue-components.html @@ -37,6 +37,7 @@ @@ -137,7 +138,10 @@ {{ worker.status || '-none-' }} - → {{ worker.status_requested}} + + + + {{ worker.status_requested}}