Check for number of workers before soft failing the task. #104195
@ -187,10 +187,25 @@ func (f *Flamenco) onTaskFailed(
|
|||||||
Int("failedByWorkerCount", numFailed).
|
Int("failedByWorkerCount", numFailed).
|
||||||
Int("threshold", threshold).
|
Int("threshold", threshold).
|
||||||
Logger()
|
Logger()
|
||||||
if numFailed < threshold {
|
|
||||||
Nitin-Rawat-1 marked this conversation as resolved
Outdated
|
|||||||
return f.softFailTask(ctx, logger, worker, task, numFailed)
|
if numFailed >= threshold {
|
||||||
}
|
|
||||||
return f.hardFailTask(ctx, logger, worker, task, numFailed)
|
return f.hardFailTask(ctx, logger, worker, task, numFailed)
|
||||||
|
}
|
||||||
|
|
||||||
|
numWorkers, err := f.numWorkersCapableOfRunningTask(ctx, task)
|
||||||
Nitin-Rawat-1 marked this conversation as resolved
Outdated
Sybren A. Stüvel
commented
I'm assuming that Also it's better to use I'm assuming that `1` is because this worker hasn't been registered as failing this task yet, and thus is still counted. If this is indeed the case, it should be mentioned in a comment, as it's not entirely obvious from glancing at the code. Or maybe I'm wrong, and then there should definitely be a comment that explains where the `1` comes from ;-)
Also it's better to use `numWorkers <= 1` here, as this should also hard-fail if there are zero workers left to run this task. Given that it's a very asynchronous system, there could be other factors that we just don't see right now here in this part of the code, that could lead to unexpected results.
|
|||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// If number of workers capable of running the failed task again is "1",
|
||||||
|
// that means we have no worker besides the one that actually failed the task.
|
||||||
|
// Because at this point in code the worker hasn't been registered as failing this task yet,
|
||||||
|
// and thus it is still counted.
|
||||||
|
// In such condition we should just fail the job itself.
|
||||||
|
if numWorkers <= 1 {
|
||||||
|
return f.failJobAfterCatastroficTaskFailure(ctx, logger, worker, task)
|
||||||
|
}
|
||||||
|
return f.softFailTask(ctx, logger, worker, task, numFailed)
|
||||||
}
|
}
|
||||||
|
|
||||||
// maybeBlocklistWorker potentially block-lists the Worker, and checks whether
|
// maybeBlocklistWorker potentially block-lists the Worker, and checks whether
|
||||||
|
@ -134,6 +134,11 @@ func TestTaskUpdateFailed(t *testing.T) {
|
|||||||
// This returns 1, which is less than the failure threshold -> soft failure expected.
|
// This returns 1, which is less than the failure threshold -> soft failure expected.
|
||||||
mf.persistence.EXPECT().AddWorkerToTaskFailedList(gomock.Any(), &mockTask, &worker).Return(1, nil)
|
mf.persistence.EXPECT().AddWorkerToTaskFailedList(gomock.Any(), &mockTask, &worker).Return(1, nil)
|
||||||
|
|
||||||
|
mf.persistence.EXPECT().WorkersLeftToRun(gomock.Any(), &mockJob, "misc").
|
||||||
|
Return(map[string]bool{"60453eec-5a26-43e9-9da2-d00506d492cc": true, "ce312357-29cd-4389-81ab-4d43e30945f8": true}, nil)
|
||||||
|
mf.persistence.EXPECT().FetchTaskFailureList(gomock.Any(), &mockTask).
|
||||||
|
Return([]*persistence.Worker{ /* It shouldn't matter whether the failing worker is here or not. */ }, nil)
|
||||||
|
|
||||||
// Expect soft failure.
|
// Expect soft failure.
|
||||||
mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), &mockTask, api.TaskStatusSoftFailed)
|
mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), &mockTask, api.TaskStatusSoftFailed)
|
||||||
mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), jobID, taskID,
|
mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), jobID, taskID,
|
||||||
@ -220,9 +225,9 @@ func TestBlockingAfterFailure(t *testing.T) {
|
|||||||
{
|
{
|
||||||
// Mimick that there is another worker to work on this task, so the job should continue happily.
|
// Mimick that there is another worker to work on this task, so the job should continue happily.
|
||||||
mf.persistence.EXPECT().WorkersLeftToRun(gomock.Any(), &mockJob, "misc").
|
mf.persistence.EXPECT().WorkersLeftToRun(gomock.Any(), &mockJob, "misc").
|
||||||
Return(map[string]bool{"60453eec-5a26-43e9-9da2-d00506d492cc": true}, nil)
|
Return(map[string]bool{"60453eec-5a26-43e9-9da2-d00506d492cc": true, "ce312357-29cd-4389-81ab-4d43e30945f8": true}, nil).Times(2)
|
||||||
mf.persistence.EXPECT().FetchTaskFailureList(gomock.Any(), &mockTask).
|
mf.persistence.EXPECT().FetchTaskFailureList(gomock.Any(), &mockTask).
|
||||||
Return([]*persistence.Worker{ /* It shouldn't matter whether the failing worker is here or not. */ }, nil)
|
Return([]*persistence.Worker{ /* It shouldn't matter whether the failing worker is here or not. */ }, nil).Times(2)
|
||||||
|
|
||||||
// Expect the Worker to be added to the list of failed workers for this task.
|
// Expect the Worker to be added to the list of failed workers for this task.
|
||||||
// This returns 1, which is less than the failure threshold -> soft failure.
|
// This returns 1, which is less than the failure threshold -> soft failure.
|
||||||
@ -313,3 +318,69 @@ func TestBlockingAfterFailure(t *testing.T) {
|
|||||||
assertResponseNoContent(t, echoCtx)
|
assertResponseNoContent(t, echoCtx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestJobFailureAfterWorkerTaskFailure(t *testing.T) {
|
||||||
|
mockCtrl := gomock.NewController(t)
|
||||||
|
defer mockCtrl.Finish()
|
||||||
|
|
||||||
|
mf := newMockedFlamenco(mockCtrl)
|
||||||
|
worker := testWorker()
|
||||||
|
|
||||||
|
// Contruct the JSON request object
|
||||||
|
taskUpdate := api.TaskUpdateJSONRequestBody{
|
||||||
|
TaskStatus: ptr(api.TaskStatusFailed),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Construct the task that's supposed to be updated.
|
||||||
|
taskID := "181eab68-1123-4790-93b1-94309a899411"
|
||||||
|
jobID := "e4719398-7cfa-4877-9bab-97c2d6c158b5"
|
||||||
|
mockJob := persistence.Job{UUID: jobID}
|
||||||
|
mockTask := persistence.Task{
|
||||||
|
UUID: taskID,
|
||||||
|
Worker: &worker,
|
||||||
|
WorkerID: &worker.ID,
|
||||||
|
Job: &mockJob,
|
||||||
|
Activity: "pre-update activity",
|
||||||
|
Type: "misc",
|
||||||
|
}
|
||||||
|
|
||||||
|
conf := config.Conf{
|
||||||
|
Base: config.Base{
|
||||||
|
TaskFailAfterSoftFailCount: 3,
|
||||||
|
BlocklistThreshold: 65535, // This test doesn't cover blocklisting.
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
mf.config.EXPECT().Get().Return(&conf).Times(2)
|
||||||
|
|
||||||
|
mf.persistence.EXPECT().FetchTask(gomock.Any(), taskID).Return(&mockTask, nil)
|
||||||
|
|
||||||
|
mf.persistence.EXPECT().TaskTouchedByWorker(gomock.Any(), &mockTask)
|
||||||
|
mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker)
|
||||||
|
|
||||||
|
mf.persistence.EXPECT().CountTaskFailuresOfWorker(gomock.Any(), &mockJob, &worker, "misc").Return(0, nil)
|
||||||
|
|
||||||
|
mf.persistence.EXPECT().AddWorkerToTaskFailedList(gomock.Any(), &mockTask, &worker).Return(1, nil)
|
||||||
|
|
||||||
|
mf.persistence.EXPECT().WorkersLeftToRun(gomock.Any(), &mockJob, "misc").
|
||||||
|
Return(map[string]bool{"e7632d62-c3b8-4af0-9e78-01752928952c": true}, nil)
|
||||||
|
mf.persistence.EXPECT().FetchTaskFailureList(gomock.Any(), &mockTask).
|
||||||
|
Return([]*persistence.Worker{ /* It shouldn't matter whether the failing worker is here or not. */ }, nil)
|
||||||
|
|
||||||
|
// Expect hard failure of the task, because there are no workers left to perfom it.
|
||||||
|
mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), &mockTask, api.TaskStatusFailed)
|
||||||
|
mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), jobID, taskID,
|
||||||
|
"Task failed by worker дрон (e7632d62-c3b8-4af0-9e78-01752928952c), Manager will fail the entire job "+
|
||||||
|
"as there are no more workers left for tasks of type \"misc\".")
|
||||||
|
|
||||||
|
// Expect failure of the job.
|
||||||
|
mf.stateMachine.EXPECT().
|
||||||
|
JobStatusChange(gomock.Any(), &mockJob, api.JobStatusFailed, "no more workers left to run tasks of type \"misc\"")
|
||||||
|
|
||||||
|
// Do the call
|
||||||
|
echoCtx := mf.prepareMockedJSONRequest(taskUpdate)
|
||||||
|
requestWorkerStore(echoCtx, &worker)
|
||||||
|
err := mf.flamenco.TaskUpdate(echoCtx, taskID)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assertResponseNoContent(t, echoCtx)
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user
If you flip this condition, you can return early and reduce the nesting level of the following code.