Check for number of workers before soft failing the task. #104195

Merged
Sybren A. Stüvel merged 15 commits from Nitin-Rawat-1/flamenco:104190-job-stuck into main 2023-04-20 11:53:43 +02:00
2 changed files with 91 additions and 5 deletions

View File

@ -187,10 +187,25 @@ func (f *Flamenco) onTaskFailed(
Int("failedByWorkerCount", numFailed).
Int("threshold", threshold).
Logger()
if numFailed < threshold {
return f.softFailTask(ctx, logger, worker, task, numFailed)
Nitin-Rawat-1 marked this conversation as resolved Outdated

If you flip this condition, you can return early and reduce the nesting level of the following code.

if numFailed > threshold {
    return f.hardFailTask(ctx, logger, worker, task, numFailed)
}
numWorkers, err := f.numWorkersCapableOfRunningTask(ctx, task)
...
If you flip this condition, you can return early and reduce the nesting level of the following code. ```go if numFailed > threshold { return f.hardFailTask(ctx, logger, worker, task, numFailed) } numWorkers, err := f.numWorkersCapableOfRunningTask(ctx, task) ... ```
if numFailed >= threshold {
return f.hardFailTask(ctx, logger, worker, task, numFailed)
}
return f.hardFailTask(ctx, logger, worker, task, numFailed)
numWorkers, err := f.numWorkersCapableOfRunningTask(ctx, task)
Nitin-Rawat-1 marked this conversation as resolved Outdated

I'm assuming that 1 is because this worker hasn't been registered as failing this task yet, and thus is still counted. If this is indeed the case, it should be mentioned in a comment, as it's not entirely obvious from glancing at the code. Or maybe I'm wrong, and then there should definitely be a comment that explains where the 1 comes from ;-)

Also it's better to use numWorkers <= 1 here, as this should also hard-fail if there are zero workers left to run this task. Given that it's a very asynchronous system, there could be other factors that we just don't see right now here in this part of the code, that could lead to unexpected results.

I'm assuming that `1` is because this worker hasn't been registered as failing this task yet, and thus is still counted. If this is indeed the case, it should be mentioned in a comment, as it's not entirely obvious from glancing at the code. Or maybe I'm wrong, and then there should definitely be a comment that explains where the `1` comes from ;-) Also it's better to use `numWorkers <= 1` here, as this should also hard-fail if there are zero workers left to run this task. Given that it's a very asynchronous system, there could be other factors that we just don't see right now here in this part of the code, that could lead to unexpected results.
if err != nil {
return err
}
// If number of workers capable of running the failed task again is "1",
// that means we have no worker besides the one that actually failed the task.
// Because at this point in code the worker hasn't been registered as failing this task yet,
// and thus it is still counted.
// In such condition we should just fail the job itself.
if numWorkers <= 1 {
return f.failJobAfterCatastroficTaskFailure(ctx, logger, worker, task)
}
return f.softFailTask(ctx, logger, worker, task, numFailed)
}
// maybeBlocklistWorker potentially block-lists the Worker, and checks whether

View File

@ -134,6 +134,11 @@ func TestTaskUpdateFailed(t *testing.T) {
// This returns 1, which is less than the failure threshold -> soft failure expected.
mf.persistence.EXPECT().AddWorkerToTaskFailedList(gomock.Any(), &mockTask, &worker).Return(1, nil)
mf.persistence.EXPECT().WorkersLeftToRun(gomock.Any(), &mockJob, "misc").
Return(map[string]bool{"60453eec-5a26-43e9-9da2-d00506d492cc": true, "ce312357-29cd-4389-81ab-4d43e30945f8": true}, nil)
mf.persistence.EXPECT().FetchTaskFailureList(gomock.Any(), &mockTask).
Return([]*persistence.Worker{ /* It shouldn't matter whether the failing worker is here or not. */ }, nil)
// Expect soft failure.
mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), &mockTask, api.TaskStatusSoftFailed)
mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), jobID, taskID,
@ -220,9 +225,9 @@ func TestBlockingAfterFailure(t *testing.T) {
{
// Mimick that there is another worker to work on this task, so the job should continue happily.
mf.persistence.EXPECT().WorkersLeftToRun(gomock.Any(), &mockJob, "misc").
Return(map[string]bool{"60453eec-5a26-43e9-9da2-d00506d492cc": true}, nil)
Return(map[string]bool{"60453eec-5a26-43e9-9da2-d00506d492cc": true, "ce312357-29cd-4389-81ab-4d43e30945f8": true}, nil).Times(2)
mf.persistence.EXPECT().FetchTaskFailureList(gomock.Any(), &mockTask).
Return([]*persistence.Worker{ /* It shouldn't matter whether the failing worker is here or not. */ }, nil)
Return([]*persistence.Worker{ /* It shouldn't matter whether the failing worker is here or not. */ }, nil).Times(2)
// Expect the Worker to be added to the list of failed workers for this task.
// This returns 1, which is less than the failure threshold -> soft failure.
@ -313,3 +318,69 @@ func TestBlockingAfterFailure(t *testing.T) {
assertResponseNoContent(t, echoCtx)
}
}
func TestJobFailureAfterWorkerTaskFailure(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mf := newMockedFlamenco(mockCtrl)
worker := testWorker()
// Contruct the JSON request object
taskUpdate := api.TaskUpdateJSONRequestBody{
TaskStatus: ptr(api.TaskStatusFailed),
}
// Construct the task that's supposed to be updated.
taskID := "181eab68-1123-4790-93b1-94309a899411"
jobID := "e4719398-7cfa-4877-9bab-97c2d6c158b5"
mockJob := persistence.Job{UUID: jobID}
mockTask := persistence.Task{
UUID: taskID,
Worker: &worker,
WorkerID: &worker.ID,
Job: &mockJob,
Activity: "pre-update activity",
Type: "misc",
}
conf := config.Conf{
Base: config.Base{
TaskFailAfterSoftFailCount: 3,
BlocklistThreshold: 65535, // This test doesn't cover blocklisting.
},
}
mf.config.EXPECT().Get().Return(&conf).Times(2)
mf.persistence.EXPECT().FetchTask(gomock.Any(), taskID).Return(&mockTask, nil)
mf.persistence.EXPECT().TaskTouchedByWorker(gomock.Any(), &mockTask)
mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker)
mf.persistence.EXPECT().CountTaskFailuresOfWorker(gomock.Any(), &mockJob, &worker, "misc").Return(0, nil)
mf.persistence.EXPECT().AddWorkerToTaskFailedList(gomock.Any(), &mockTask, &worker).Return(1, nil)
mf.persistence.EXPECT().WorkersLeftToRun(gomock.Any(), &mockJob, "misc").
Return(map[string]bool{"e7632d62-c3b8-4af0-9e78-01752928952c": true}, nil)
mf.persistence.EXPECT().FetchTaskFailureList(gomock.Any(), &mockTask).
Return([]*persistence.Worker{ /* It shouldn't matter whether the failing worker is here or not. */ }, nil)
// Expect hard failure of the task, because there are no workers left to perfom it.
mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), &mockTask, api.TaskStatusFailed)
mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), jobID, taskID,
"Task failed by worker дрон (e7632d62-c3b8-4af0-9e78-01752928952c), Manager will fail the entire job "+
"as there are no more workers left for tasks of type \"misc\".")
// Expect failure of the job.
mf.stateMachine.EXPECT().
JobStatusChange(gomock.Any(), &mockJob, api.JobStatusFailed, "no more workers left to run tasks of type \"misc\"")
// Do the call
echoCtx := mf.prepareMockedJSONRequest(taskUpdate)
requestWorkerStore(echoCtx, &worker)
err := mf.flamenco.TaskUpdate(echoCtx, taskID)
assert.NoError(t, err)
assertResponseNoContent(t, echoCtx)
}