diff --git a/flamenco/dashboard.go b/flamenco/dashboard.go index 3790aa17..1380ff48 100644 --- a/flamenco/dashboard.go +++ b/flamenco/dashboard.go @@ -159,6 +159,7 @@ func (dash *Dashboard) sendStatusReport(w http.ResponseWriter, r *http.Request) "software": 1, "status": 1, "status_requested": 1, + "lazy_status_request": 1, "supported_task_types": 1, "sleep_schedule": 1, "blacklist": 1, @@ -241,11 +242,19 @@ func (dash *Dashboard) workerAction(w http.ResponseWriter, r *http.Request) { actionHandlers := map[string]func(){ "set-status": func() { requestedStatus := r.FormValue("status") - logger = logger.WithField("requested_status", requestedStatus) - actionErr = worker.RequestStatusChange(requestedStatus, db) + lazy := Lazyness(r.FormValue("lazy") == "true") + logger = logger.WithFields(log.Fields{ + "requested_status": requestedStatus, + "lazy": lazy, + }) + actionErr = worker.RequestStatusChange(requestedStatus, lazy, db) }, "shutdown": func() { - actionErr = worker.RequestStatusChange(workerStatusShutdown, db) + lazy := Lazyness(r.FormValue("lazy") == "true") + logger = logger.WithFields(log.Fields{ + "lazy": lazy, + }) + actionErr = worker.RequestStatusChange(workerStatusShutdown, lazy, db) }, "ack-timeout": func() { actionErr = worker.AckTimeout(db) diff --git a/flamenco/documents.go b/flamenco/documents.go index 96da169b..db27ea6e 100644 --- a/flamenco/documents.go +++ b/flamenco/documents.go @@ -139,9 +139,12 @@ type Worker struct { CurrentJob bson.ObjectId `bson:"current_job,omitempty" json:"current_job,omitempty"` // For controlling sleeping & waking up. For values, see the workerStatusXXX constants. - StatusRequested string `bson:"status_requested" json:"status_requested"` - SleepSchedule ScheduleInfo `bson:"sleep_schedule,omitempty" json:"sleep_schedule"` - Blacklist []WorkerBlacklistEntry `json:"blacklist,omitempty"` + StatusRequested string `bson:"status_requested" json:"status_requested"` + LazyStatusRequest Lazyness `bson:"lazy_status_request" json:"lazy_status_request"` // Only apply requested status when current task is finished. + SleepSchedule ScheduleInfo `bson:"sleep_schedule,omitempty" json:"sleep_schedule"` + + // For preventing a failing worker from eating up all tasks of a certain job. + Blacklist []WorkerBlacklistEntry `json:"blacklist,omitempty"` } // ScheduleInfo for automatically sending a Worker to sleep & waking up. diff --git a/flamenco/scheduler.go b/flamenco/scheduler.go index f13ce1dd..945c2bb6 100644 --- a/flamenco/scheduler.go +++ b/flamenco/scheduler.go @@ -428,10 +428,15 @@ func (ts *TaskScheduler) WorkerMayRunTask(w http.ResponseWriter, r *auth.Authent db *mgo.Database, taskID bson.ObjectId) { worker, logFields := findWorkerForHTTP(w, r, db) + statusRequested := "" logFields["task_id"] = taskID.Hex() + if worker.StatusRequested != "" && worker.LazyStatusRequest == Immediate { + statusRequested = worker.StatusRequested + } if worker.StatusRequested != "" { logFields["worker_status_requested"] = worker.StatusRequested + logFields["lazy_status_request"] = worker.LazyStatusRequest } worker.SetAwake(db) worker.Seen(&r.Request, db) @@ -459,9 +464,9 @@ func (ts *TaskScheduler) WorkerMayRunTask(w http.ResponseWriter, r *auth.Authent logFields["task_status"] = task.Status log.WithFields(logFields).Warning("WorkerMayRunTask: task is in not-runnable status, worker will stop") response.Reason = fmt.Sprintf("task %s in non-runnable status %s", taskID.Hex(), task.Status) - } else if !workerStatusRunnable[worker.StatusRequested] { + } else if !workerStatusRunnable[statusRequested] { log.WithFields(logFields).Warning("WorkerMayRunTask: worker was requested to go to non-active status; will stop its current task") - response.Reason = fmt.Sprintf("worker status change to %s requested", worker.StatusRequested) + response.Reason = fmt.Sprintf("worker status change to %s requested", statusRequested) } else { response.MayKeepRunning = true WorkerPingedTask(worker.ID, taskID, "", db) diff --git a/flamenco/scheduler_test.go b/flamenco/scheduler_test.go index 22384d0f..c2898cc8 100644 --- a/flamenco/scheduler_test.go +++ b/flamenco/scheduler_test.go @@ -562,6 +562,37 @@ func (s *SchedulerTestSuite) TestWorkerMayRun(t *check.C) { assert.Equal(t, false, resp.MayKeepRunning) } +func (s *SchedulerTestSuite) TestWorkerMayRunStatusRequested(t *check.C) { + // Store task in DB. + task := ConstructTestTask("aaaaaaaaaaaaaaaaaaaaaaaa", "sleeping") + if err := s.db.C("flamenco_tasks").Insert(task); err != nil { + t.Fatal("Unable to insert test task", err) + } + + // Make sure the scheduler gives us this task. + respRec, ar := WorkerTestRequest(s.workerLnx.ID, "GET", "/task") + s.sched.ScheduleTask(respRec, ar) + + // When a lazy status change is requested, we should be allowed to keep running. + assert.Nil(t, s.workerLnx.RequestStatusChange(workerStatusAsleep, Lazy, s.db)) + respRec, ar = WorkerTestRequest(s.workerLnx.ID, "GET", "/may-i-run/%s", task.ID.Hex()) + s.sched.WorkerMayRunTask(respRec, ar, s.db, task.ID) + + resp := MayKeepRunningResponse{} + parseJSON(t, respRec, 200, &resp) + assert.Equal(t, "", resp.Reason) + assert.Equal(t, true, resp.MayKeepRunning) + + // When an immediate status change is requested, we shouldn't be allowed to keep running. + assert.Nil(t, s.workerLnx.RequestStatusChange(workerStatusAsleep, Immediate, s.db)) + respRec, ar = WorkerTestRequest(s.workerLnx.ID, "GET", "/may-i-run/%s", task.ID.Hex()) + s.sched.WorkerMayRunTask(respRec, ar, s.db, task.ID) + + parseJSON(t, respRec, 200, &resp) + assert.Equal(t, "worker status change to asleep requested", resp.Reason) + assert.Equal(t, false, resp.MayKeepRunning) +} + func (s *SchedulerTestSuite) TestBlacklist(c *check.C) { // Insert a number of tasks of different type & job. job1 := bson.NewObjectId() diff --git a/flamenco/sleep_scheduler.go b/flamenco/sleep_scheduler.go index 1596a54e..7f42eb52 100644 --- a/flamenco/sleep_scheduler.go +++ b/flamenco/sleep_scheduler.go @@ -102,7 +102,7 @@ func (ss *SleepScheduler) RequestWorkerStatus(worker *Worker, db *mgo.Database) } else { logger.Info("requesting worker to switch to scheduled status") } - if err := worker.RequestStatusChange(scheduled, db); err != nil { + if err := worker.RequestStatusChange(scheduled, Immediate, db); err != nil { logger.WithError(err).Error("unable to store status change in database") } } diff --git a/flamenco/workers.go b/flamenco/workers.go index 2484b15d..dcdc14f2 100644 --- a/flamenco/workers.go +++ b/flamenco/workers.go @@ -30,6 +30,16 @@ var workerStatusRunnable = map[string]bool{ "": true, // no status change was requested } +// Lazyness indicates whether a worker's requested status change is lazy (true) or immediate (false). +type Lazyness bool + +const ( + // Immediate status change requests interrupt the currently running task. + Immediate Lazyness = false + // Lazy status change requests are applied when the currently running task finishes. + Lazy Lazyness = true +) + // Identifier returns the worker's address, with the nickname in parentheses (if set). // // Make sure that you include the nickname in the projection when you fetch @@ -72,14 +82,19 @@ func (worker *Worker) SetCurrentTask(taskID bson.ObjectId, db *mgo.Database) err } // RequestStatusChange stores the new requested status in MongoDB, so that it gets picked up -// by the worker the next time it asks for it. -func (worker *Worker) RequestStatusChange(newStatus string, db *mgo.Database) error { +// by the worker the next time it asks for it. Parameter 'lazy' indicates that the worker can +// finish the current task first, before applying the status change. +func (worker *Worker) RequestStatusChange(newStatus string, lazy Lazyness, db *mgo.Database) error { if !workerStatusRequestable[newStatus] { return fmt.Errorf("RequestStatusChange(%q): status cannot be requested", newStatus) } worker.StatusRequested = newStatus - updates := M{"status_requested": newStatus} + worker.LazyStatusRequest = lazy + updates := M{ + "status_requested": newStatus, + "lazy_status_request": lazy, + } return db.C("flamenco_workers").UpdateId(worker.ID, M{"$set": updates}) } @@ -426,7 +441,7 @@ func WorkerSignOff(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo. // Update the worker itself, to show it's down in the DB too. if worker.Status == workerStatusAsleep && (worker.StatusRequested == "" || worker.StatusRequested == workerStatusShutdown) { // Make sure that the worker remains asleep, even after signing on again. - defer worker.RequestStatusChange(workerStatusAsleep, db) + defer worker.RequestStatusChange(workerStatusAsleep, Immediate, db) } // Signing off is seen as acknowledgement of the "shutdown" status. diff --git a/flamenco/workers_test.go b/flamenco/workers_test.go index f699e265..c4820cba 100644 --- a/flamenco/workers_test.go +++ b/flamenco/workers_test.go @@ -222,7 +222,7 @@ func (s *WorkerTestSuite) TestWorkerSignOff(t *check.C) { // Signing off when awake and shutdown requested s.workerLnx.SetStatus(workerStatusAwake, s.db) - s.workerLnx.RequestStatusChange(workerStatusShutdown, s.db) + s.workerLnx.RequestStatusChange(workerStatusShutdown, Immediate, s.db) signoff() getworker() assert.Equal(t, workerStatusOffline, found.Status) @@ -230,7 +230,7 @@ func (s *WorkerTestSuite) TestWorkerSignOff(t *check.C) { // Signing off when asleep s.workerLnx.SetStatus(workerStatusAsleep, s.db) - s.workerLnx.RequestStatusChange(workerStatusShutdown, s.db) + s.workerLnx.RequestStatusChange(workerStatusShutdown, Immediate, s.db) signoff() getworker() assert.Equal(t, workerStatusOffline, found.Status) @@ -238,7 +238,7 @@ func (s *WorkerTestSuite) TestWorkerSignOff(t *check.C) { // Signing off when asleep and awake requested s.workerLnx.SetStatus(workerStatusAsleep, s.db) - s.workerLnx.RequestStatusChange(workerStatusAwake, s.db) + s.workerLnx.RequestStatusChange(workerStatusAwake, Immediate, s.db) signoff() getworker() assert.Equal(t, workerStatusOffline, found.Status) @@ -246,7 +246,7 @@ func (s *WorkerTestSuite) TestWorkerSignOff(t *check.C) { // Signing off when timed out s.workerLnx.SetStatus(workerStatusTimeout, s.db) - s.workerLnx.RequestStatusChange(workerStatusShutdown, s.db) + s.workerLnx.RequestStatusChange(workerStatusShutdown, Immediate, s.db) signoff() getworker() assert.Equal(t, workerStatusOffline, found.Status) @@ -256,7 +256,7 @@ func (s *WorkerTestSuite) TestWorkerSignOff(t *check.C) { // Tests receiving the status change via /may-i-run and /task func (s *WorkerTestSuite) TestStatusChangeReceiving(t *check.C) { // Requesting a new status should set it both on the instance and on the database. - err := s.workerLnx.RequestStatusChange(workerStatusAsleep, s.db) + err := s.workerLnx.RequestStatusChange(workerStatusAsleep, Immediate, s.db) assert.Nil(t, err) assert.Equal(t, workerStatusAsleep, s.workerLnx.StatusRequested) assert.Equal(t, workerStatusAwake, s.workerLnx.Status) @@ -300,7 +300,7 @@ func (s *WorkerTestSuite) TestWorkerStatusChange(t *check.C) { assert.Equal(t, http.StatusNoContent, respRec.Code) // Request a status change. - err := s.workerLnx.RequestStatusChange(workerStatusAsleep, s.db) + err := s.workerLnx.RequestStatusChange(workerStatusAsleep, Immediate, s.db) assert.Nil(t, err) // Now we should get the change back. @@ -313,7 +313,7 @@ func (s *WorkerTestSuite) TestWorkerStatusChange(t *check.C) { } func (s *WorkerTestSuite) TestAckStatusChange(t *check.C) { - err := s.workerLnx.RequestStatusChange(workerStatusAsleep, s.db) + err := s.workerLnx.RequestStatusChange(workerStatusAsleep, Immediate, s.db) assert.Nil(t, err) // Tests direct function call. @@ -331,7 +331,7 @@ func (s *WorkerTestSuite) TestAckStatusChange(t *check.C) { } func (s *WorkerTestSuite) TestAckStatusChangeHTTP(t *check.C) { - err := s.workerLnx.RequestStatusChange(workerStatusAsleep, s.db) + err := s.workerLnx.RequestStatusChange(workerStatusAsleep, Immediate, s.db) assert.Nil(t, err) // Tests requested status change. @@ -377,7 +377,7 @@ func (s *WorkerTestSuite) TestTimeout(t *check.C) { func (s *WorkerTestSuite) TestStatusChangeNotRequestable(t *check.C) { teststatus := func(status string) { - err := s.workerLnx.RequestStatusChange(status, s.db) + err := s.workerLnx.RequestStatusChange(status, Immediate, s.db) assert.NotNil(t, err) assert.Equal(t, "", s.workerLnx.StatusRequested) assert.Equal(t, workerStatusAwake, s.workerLnx.Status) diff --git a/static/dashboard.css b/static/dashboard.css index 36954a22..d4d35a94 100644 --- a/static/dashboard.css +++ b/static/dashboard.css @@ -116,6 +116,12 @@ button#downloadkick { border-bottom: none; } +.worker-action:hover { + text-shadow: + 0 0 1px black, + 0 0 3px fuchsia; +} + .worker-action.forget-worker { font-size: 85%; vertical-align: super; diff --git a/static/dashboard.js b/static/dashboard.js index 8abf8593..70393d33 100644 --- a/static/dashboard.js +++ b/static/dashboard.js @@ -12,19 +12,36 @@ var load_workers_timeout_handle; * the action should not be available to workers already in that status. */ WORKER_ACTIONS = Object.freeze({ - offline: { - label: '✝ Shut Down', + offline_lazy: { + label: '✝ Shut Down (after task is finished)', icon: '✝', - title: 'The worker may automatically restart.', - payload: { action: 'shutdown' }, + title: 'Shut down the worker after the current task finishes. The worker may automatically restart.', + payload: { action: 'shutdown', lazy: true }, + available(worker_status) { return worker_status != 'offline'; }, }, - asleep: { - label: '😴 Send to Sleep', + offline_immediate: { + label: '✝! Shut Down (immediately)', + icon: '✝!', + title: 'Immediately shut down the worker. It may automatically restart.', + payload: { action: 'shutdown', lazy: false }, + available(worker_status) { return false }, + }, + asleep_lazy: { + label: '😴 Send to Sleep (after task is finished)', icon: '😴', - title: 'Let the worker sleep', - payload: { action: 'set-status', status: 'asleep' }, + title: 'Let the worker sleep after finishing this task.', + payload: { action: 'set-status', status: 'asleep', lazy: true }, available(worker_status) { - return worker_status != 'timeout'; + return worker_status != 'timeout' && worker_status != 'asleep'; + }, + }, + asleep_immediate: { + label: '😴! Send to Sleep (immediately)', + icon: '😴!', + title: 'Let the worker sleep immediately.', + payload: { action: 'set-status', status: 'asleep', lazy: false }, + available(worker_status) { + return worker_status != 'timeout' && worker_status != 'asleep'; }, }, wakeup: { @@ -32,7 +49,9 @@ WORKER_ACTIONS = Object.freeze({ icon: '😃', title: 'Wake the worker up. A sleeping worker can take a minute to respond.', payload: { action: 'set-status', status: 'awake' }, - available(worker_status, requested_status) { return worker_status == 'asleep' || requested_status == 'asleep'; }, + available(worker_status, requested_status) { + return worker_status == 'asleep' || requested_status == 'asleep'; + }, }, ack_timeout: { label: '✓ Acknowledge Timeout', diff --git a/static/vue-components.html b/static/vue-components.html index a887db71..c903d0f3 100644 --- a/static/vue-components.html +++ b/static/vue-components.html @@ -37,6 +37,7 @@ @@ -137,7 +138,10 @@ {{ worker.status || '-none-' }} - → {{ worker.status_requested}} + + + + {{ worker.status_requested}}