Fix T58779: allow lazy status change requests

Status changes can now be marked as 'lazy', in which case they are only
applied when the worker has finished its current task. This only required
changes to the 'may-I-run' endpoint; it now ignores lazy requests.
This commit is contained in:
2018-12-18 10:51:55 +01:00
parent e64ffe098d
commit cfe561c79e
10 changed files with 125 additions and 33 deletions
+12 -3
View File
@@ -159,6 +159,7 @@ func (dash *Dashboard) sendStatusReport(w http.ResponseWriter, r *http.Request)
"software": 1,
"status": 1,
"status_requested": 1,
"lazy_status_request": 1,
"supported_task_types": 1,
"sleep_schedule": 1,
"blacklist": 1,
@@ -241,11 +242,19 @@ func (dash *Dashboard) workerAction(w http.ResponseWriter, r *http.Request) {
actionHandlers := map[string]func(){
"set-status": func() {
requestedStatus := r.FormValue("status")
logger = logger.WithField("requested_status", requestedStatus)
actionErr = worker.RequestStatusChange(requestedStatus, db)
lazy := Lazyness(r.FormValue("lazy") == "true")
logger = logger.WithFields(log.Fields{
"requested_status": requestedStatus,
"lazy": lazy,
})
actionErr = worker.RequestStatusChange(requestedStatus, lazy, db)
},
"shutdown": func() {
actionErr = worker.RequestStatusChange(workerStatusShutdown, db)
lazy := Lazyness(r.FormValue("lazy") == "true")
logger = logger.WithFields(log.Fields{
"lazy": lazy,
})
actionErr = worker.RequestStatusChange(workerStatusShutdown, lazy, db)
},
"ack-timeout": func() {
actionErr = worker.AckTimeout(db)
+6 -3
View File
@@ -139,9 +139,12 @@ type Worker struct {
CurrentJob bson.ObjectId `bson:"current_job,omitempty" json:"current_job,omitempty"`
// For controlling sleeping & waking up. For values, see the workerStatusXXX constants.
StatusRequested string `bson:"status_requested" json:"status_requested"`
SleepSchedule ScheduleInfo `bson:"sleep_schedule,omitempty" json:"sleep_schedule"`
Blacklist []WorkerBlacklistEntry `json:"blacklist,omitempty"`
StatusRequested string `bson:"status_requested" json:"status_requested"`
LazyStatusRequest Lazyness `bson:"lazy_status_request" json:"lazy_status_request"` // Only apply requested status when current task is finished.
SleepSchedule ScheduleInfo `bson:"sleep_schedule,omitempty" json:"sleep_schedule"`
// For preventing a failing worker from eating up all tasks of a certain job.
Blacklist []WorkerBlacklistEntry `json:"blacklist,omitempty"`
}
// ScheduleInfo for automatically sending a Worker to sleep & waking up.
+7 -2
View File
@@ -428,10 +428,15 @@ func (ts *TaskScheduler) WorkerMayRunTask(w http.ResponseWriter, r *auth.Authent
db *mgo.Database, taskID bson.ObjectId) {
worker, logFields := findWorkerForHTTP(w, r, db)
statusRequested := ""
logFields["task_id"] = taskID.Hex()
if worker.StatusRequested != "" && worker.LazyStatusRequest == Immediate {
statusRequested = worker.StatusRequested
}
if worker.StatusRequested != "" {
logFields["worker_status_requested"] = worker.StatusRequested
logFields["lazy_status_request"] = worker.LazyStatusRequest
}
worker.SetAwake(db)
worker.Seen(&r.Request, db)
@@ -459,9 +464,9 @@ func (ts *TaskScheduler) WorkerMayRunTask(w http.ResponseWriter, r *auth.Authent
logFields["task_status"] = task.Status
log.WithFields(logFields).Warning("WorkerMayRunTask: task is in not-runnable status, worker will stop")
response.Reason = fmt.Sprintf("task %s in non-runnable status %s", taskID.Hex(), task.Status)
} else if !workerStatusRunnable[worker.StatusRequested] {
} else if !workerStatusRunnable[statusRequested] {
log.WithFields(logFields).Warning("WorkerMayRunTask: worker was requested to go to non-active status; will stop its current task")
response.Reason = fmt.Sprintf("worker status change to %s requested", worker.StatusRequested)
response.Reason = fmt.Sprintf("worker status change to %s requested", statusRequested)
} else {
response.MayKeepRunning = true
WorkerPingedTask(worker.ID, taskID, "", db)
+31
View File
@@ -562,6 +562,37 @@ func (s *SchedulerTestSuite) TestWorkerMayRun(t *check.C) {
assert.Equal(t, false, resp.MayKeepRunning)
}
func (s *SchedulerTestSuite) TestWorkerMayRunStatusRequested(t *check.C) {
// Store task in DB.
task := ConstructTestTask("aaaaaaaaaaaaaaaaaaaaaaaa", "sleeping")
if err := s.db.C("flamenco_tasks").Insert(task); err != nil {
t.Fatal("Unable to insert test task", err)
}
// Make sure the scheduler gives us this task.
respRec, ar := WorkerTestRequest(s.workerLnx.ID, "GET", "/task")
s.sched.ScheduleTask(respRec, ar)
// When a lazy status change is requested, we should be allowed to keep running.
assert.Nil(t, s.workerLnx.RequestStatusChange(workerStatusAsleep, Lazy, s.db))
respRec, ar = WorkerTestRequest(s.workerLnx.ID, "GET", "/may-i-run/%s", task.ID.Hex())
s.sched.WorkerMayRunTask(respRec, ar, s.db, task.ID)
resp := MayKeepRunningResponse{}
parseJSON(t, respRec, 200, &resp)
assert.Equal(t, "", resp.Reason)
assert.Equal(t, true, resp.MayKeepRunning)
// When an immediate status change is requested, we shouldn't be allowed to keep running.
assert.Nil(t, s.workerLnx.RequestStatusChange(workerStatusAsleep, Immediate, s.db))
respRec, ar = WorkerTestRequest(s.workerLnx.ID, "GET", "/may-i-run/%s", task.ID.Hex())
s.sched.WorkerMayRunTask(respRec, ar, s.db, task.ID)
parseJSON(t, respRec, 200, &resp)
assert.Equal(t, "worker status change to asleep requested", resp.Reason)
assert.Equal(t, false, resp.MayKeepRunning)
}
func (s *SchedulerTestSuite) TestBlacklist(c *check.C) {
// Insert a number of tasks of different type & job.
job1 := bson.NewObjectId()
+1 -1
View File
@@ -102,7 +102,7 @@ func (ss *SleepScheduler) RequestWorkerStatus(worker *Worker, db *mgo.Database)
} else {
logger.Info("requesting worker to switch to scheduled status")
}
if err := worker.RequestStatusChange(scheduled, db); err != nil {
if err := worker.RequestStatusChange(scheduled, Immediate, db); err != nil {
logger.WithError(err).Error("unable to store status change in database")
}
}
+19 -4
View File
@@ -30,6 +30,16 @@ var workerStatusRunnable = map[string]bool{
"": true, // no status change was requested
}
// Lazyness indicates whether a worker's requested status change is lazy (true) or immediate (false).
type Lazyness bool
const (
// Immediate status change requests interrupt the currently running task.
Immediate Lazyness = false
// Lazy status change requests are applied when the currently running task finishes.
Lazy Lazyness = true
)
// Identifier returns the worker's address, with the nickname in parentheses (if set).
//
// Make sure that you include the nickname in the projection when you fetch
@@ -72,14 +82,19 @@ func (worker *Worker) SetCurrentTask(taskID bson.ObjectId, db *mgo.Database) err
}
// RequestStatusChange stores the new requested status in MongoDB, so that it gets picked up
// by the worker the next time it asks for it.
func (worker *Worker) RequestStatusChange(newStatus string, db *mgo.Database) error {
// by the worker the next time it asks for it. Parameter 'lazy' indicates that the worker can
// finish the current task first, before applying the status change.
func (worker *Worker) RequestStatusChange(newStatus string, lazy Lazyness, db *mgo.Database) error {
if !workerStatusRequestable[newStatus] {
return fmt.Errorf("RequestStatusChange(%q): status cannot be requested", newStatus)
}
worker.StatusRequested = newStatus
updates := M{"status_requested": newStatus}
worker.LazyStatusRequest = lazy
updates := M{
"status_requested": newStatus,
"lazy_status_request": lazy,
}
return db.C("flamenco_workers").UpdateId(worker.ID, M{"$set": updates})
}
@@ -426,7 +441,7 @@ func WorkerSignOff(w http.ResponseWriter, r *auth.AuthenticatedRequest, db *mgo.
// Update the worker itself, to show it's down in the DB too.
if worker.Status == workerStatusAsleep && (worker.StatusRequested == "" || worker.StatusRequested == workerStatusShutdown) {
// Make sure that the worker remains asleep, even after signing on again.
defer worker.RequestStatusChange(workerStatusAsleep, db)
defer worker.RequestStatusChange(workerStatusAsleep, Immediate, db)
}
// Signing off is seen as acknowledgement of the "shutdown" status.
+9 -9
View File
@@ -222,7 +222,7 @@ func (s *WorkerTestSuite) TestWorkerSignOff(t *check.C) {
// Signing off when awake and shutdown requested
s.workerLnx.SetStatus(workerStatusAwake, s.db)
s.workerLnx.RequestStatusChange(workerStatusShutdown, s.db)
s.workerLnx.RequestStatusChange(workerStatusShutdown, Immediate, s.db)
signoff()
getworker()
assert.Equal(t, workerStatusOffline, found.Status)
@@ -230,7 +230,7 @@ func (s *WorkerTestSuite) TestWorkerSignOff(t *check.C) {
// Signing off when asleep
s.workerLnx.SetStatus(workerStatusAsleep, s.db)
s.workerLnx.RequestStatusChange(workerStatusShutdown, s.db)
s.workerLnx.RequestStatusChange(workerStatusShutdown, Immediate, s.db)
signoff()
getworker()
assert.Equal(t, workerStatusOffline, found.Status)
@@ -238,7 +238,7 @@ func (s *WorkerTestSuite) TestWorkerSignOff(t *check.C) {
// Signing off when asleep and awake requested
s.workerLnx.SetStatus(workerStatusAsleep, s.db)
s.workerLnx.RequestStatusChange(workerStatusAwake, s.db)
s.workerLnx.RequestStatusChange(workerStatusAwake, Immediate, s.db)
signoff()
getworker()
assert.Equal(t, workerStatusOffline, found.Status)
@@ -246,7 +246,7 @@ func (s *WorkerTestSuite) TestWorkerSignOff(t *check.C) {
// Signing off when timed out
s.workerLnx.SetStatus(workerStatusTimeout, s.db)
s.workerLnx.RequestStatusChange(workerStatusShutdown, s.db)
s.workerLnx.RequestStatusChange(workerStatusShutdown, Immediate, s.db)
signoff()
getworker()
assert.Equal(t, workerStatusOffline, found.Status)
@@ -256,7 +256,7 @@ func (s *WorkerTestSuite) TestWorkerSignOff(t *check.C) {
// Tests receiving the status change via /may-i-run and /task
func (s *WorkerTestSuite) TestStatusChangeReceiving(t *check.C) {
// Requesting a new status should set it both on the instance and on the database.
err := s.workerLnx.RequestStatusChange(workerStatusAsleep, s.db)
err := s.workerLnx.RequestStatusChange(workerStatusAsleep, Immediate, s.db)
assert.Nil(t, err)
assert.Equal(t, workerStatusAsleep, s.workerLnx.StatusRequested)
assert.Equal(t, workerStatusAwake, s.workerLnx.Status)
@@ -300,7 +300,7 @@ func (s *WorkerTestSuite) TestWorkerStatusChange(t *check.C) {
assert.Equal(t, http.StatusNoContent, respRec.Code)
// Request a status change.
err := s.workerLnx.RequestStatusChange(workerStatusAsleep, s.db)
err := s.workerLnx.RequestStatusChange(workerStatusAsleep, Immediate, s.db)
assert.Nil(t, err)
// Now we should get the change back.
@@ -313,7 +313,7 @@ func (s *WorkerTestSuite) TestWorkerStatusChange(t *check.C) {
}
func (s *WorkerTestSuite) TestAckStatusChange(t *check.C) {
err := s.workerLnx.RequestStatusChange(workerStatusAsleep, s.db)
err := s.workerLnx.RequestStatusChange(workerStatusAsleep, Immediate, s.db)
assert.Nil(t, err)
// Tests direct function call.
@@ -331,7 +331,7 @@ func (s *WorkerTestSuite) TestAckStatusChange(t *check.C) {
}
func (s *WorkerTestSuite) TestAckStatusChangeHTTP(t *check.C) {
err := s.workerLnx.RequestStatusChange(workerStatusAsleep, s.db)
err := s.workerLnx.RequestStatusChange(workerStatusAsleep, Immediate, s.db)
assert.Nil(t, err)
// Tests requested status change.
@@ -377,7 +377,7 @@ func (s *WorkerTestSuite) TestTimeout(t *check.C) {
func (s *WorkerTestSuite) TestStatusChangeNotRequestable(t *check.C) {
teststatus := func(status string) {
err := s.workerLnx.RequestStatusChange(status, s.db)
err := s.workerLnx.RequestStatusChange(status, Immediate, s.db)
assert.NotNil(t, err)
assert.Equal(t, "", s.workerLnx.StatusRequested)
assert.Equal(t, workerStatusAwake, s.workerLnx.Status)
+6
View File
@@ -116,6 +116,12 @@ button#downloadkick {
border-bottom: none;
}
.worker-action:hover {
text-shadow:
0 0 1px black,
0 0 3px fuchsia;
}
.worker-action.forget-worker {
font-size: 85%;
vertical-align: super;
+29 -10
View File
@@ -12,19 +12,36 @@ var load_workers_timeout_handle;
* the action should not be available to workers already in that status.
*/
WORKER_ACTIONS = Object.freeze({
offline: {
label: '✝ Shut Down',
offline_lazy: {
label: '✝ Shut Down (after task is finished)',
icon: '✝',
title: 'The worker may automatically restart.',
payload: { action: 'shutdown' },
title: 'Shut down the worker after the current task finishes. The worker may automatically restart.',
payload: { action: 'shutdown', lazy: true },
available(worker_status) { return worker_status != 'offline'; },
},
asleep: {
label: '😴 Send to Sleep',
offline_immediate: {
label: '✝! Shut Down (immediately)',
icon: '✝!',
title: 'Immediately shut down the worker. It may automatically restart.',
payload: { action: 'shutdown', lazy: false },
available(worker_status) { return false },
},
asleep_lazy: {
label: '😴 Send to Sleep (after task is finished)',
icon: '😴',
title: 'Let the worker sleep',
payload: { action: 'set-status', status: 'asleep' },
title: 'Let the worker sleep after finishing this task.',
payload: { action: 'set-status', status: 'asleep', lazy: true },
available(worker_status) {
return worker_status != 'timeout';
return worker_status != 'timeout' && worker_status != 'asleep';
},
},
asleep_immediate: {
label: '😴! Send to Sleep (immediately)',
icon: '😴!',
title: 'Let the worker sleep immediately.',
payload: { action: 'set-status', status: 'asleep', lazy: false },
available(worker_status) {
return worker_status != 'timeout' && worker_status != 'asleep';
},
},
wakeup: {
@@ -32,7 +49,9 @@ WORKER_ACTIONS = Object.freeze({
icon: '😃',
title: 'Wake the worker up. A sleeping worker can take a minute to respond.',
payload: { action: 'set-status', status: 'awake' },
available(worker_status, requested_status) { return worker_status == 'asleep' || requested_status == 'asleep'; },
available(worker_status, requested_status) {
return worker_status == 'asleep' || requested_status == 'asleep';
},
},
ack_timeout: {
label: '✓ Acknowledge Timeout',
+5 -1
View File
@@ -37,6 +37,7 @@
<!-- template for the 'action-button' Vue.js component -->
<script type='text/x-template' id='template_action_button'>
<a class='worker-action'
:title="action.title"
@click="$emit('worker-action', worker_id, action_key)"
>{{action.icon}}</a>
</script>
@@ -137,7 +138,10 @@
</td>
<td class='col-narrow'><label :for="checkbox_id">{{ worker.nickname }}</label></td>
<td class='col-narrow'>{{ worker.status || '-none-' }}
<span v-if="worker.status_requested"> {{ worker.status_requested}}</span>
<span v-if="worker.status_requested">
<span v-if="worker.lazy_status_request" title="Status change queued for after current task is finished."></span>
<span v-else title="Immediate status change is requested."></span>
{{ worker.status_requested}}</span>
</td>
<template v-if="mode != 'edit_schedule'">