From 9fdf5aa7c5d1327cd876492749e85a5ed534f6b9 Mon Sep 17 00:00:00 2001 From: Nitin Rawat Date: Thu, 9 Mar 2023 23:45:16 +0530 Subject: [PATCH 1/6] Manager: fixed issue #104190 job getting stuck with less workers than soft-failed threshold before soft-failing check the number of workers to decide if job should be failed or not. --- internal/manager/api_impl/worker_task_updates.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/internal/manager/api_impl/worker_task_updates.go b/internal/manager/api_impl/worker_task_updates.go index f6767270..97744544 100644 --- a/internal/manager/api_impl/worker_task_updates.go +++ b/internal/manager/api_impl/worker_task_updates.go @@ -188,6 +188,13 @@ func (f *Flamenco) onTaskFailed( Int("threshold", threshold). Logger() if numFailed < threshold { + numWorkers, err := f.numWorkersCapableOfRunningTask(ctx, task) + if err != nil { + return err + } + if numWorkers == 1 { + return f.failJobAfterCatastroficTaskFailure(ctx, logger, worker, task) + } return f.softFailTask(ctx, logger, worker, task, numFailed) } return f.hardFailTask(ctx, logger, worker, task, numFailed) -- 2.30.2 From 6e24e0be3b7e8378128528744ed14f9cedb8b87e Mon Sep 17 00:00:00 2001 From: Nitin Rawat Date: Fri, 17 Mar 2023 15:13:22 +0530 Subject: [PATCH 2/6] reviese the conditions for job failure --- .../manager/api_impl/worker_task_updates.go | 28 ++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/internal/manager/api_impl/worker_task_updates.go b/internal/manager/api_impl/worker_task_updates.go index 97744544..ffcd20b3 100644 --- a/internal/manager/api_impl/worker_task_updates.go +++ b/internal/manager/api_impl/worker_task_updates.go @@ -187,17 +187,25 @@ func (f *Flamenco) onTaskFailed( Int("failedByWorkerCount", numFailed). Int("threshold", threshold). Logger() - if numFailed < threshold { - numWorkers, err := f.numWorkersCapableOfRunningTask(ctx, task) - if err != nil { - return err - } - if numWorkers == 1 { - return f.failJobAfterCatastroficTaskFailure(ctx, logger, worker, task) - } - return f.softFailTask(ctx, logger, worker, task, numFailed) + + if numFailed > threshold { + return f.hardFailTask(ctx, logger, worker, task, numFailed) } - return f.hardFailTask(ctx, logger, worker, task, numFailed) + + numWorkers, err := f.numWorkersCapableOfRunningTask(ctx, task) + if err != nil { + return err + } + + // If number of workers capable of running the failed task again is "1", + // that means we have no worker besides the one that actually failed the task. + // Because at this point in code the worker hasn't been registered as failing this task yet, + // and thus it is still counted. + // In such condition we should just fail the job itself. + if numWorkers <= 1 { + return f.failJobAfterCatastroficTaskFailure(ctx, logger, worker, task) + } + return f.softFailTask(ctx, logger, worker, task, numFailed) } // maybeBlocklistWorker potentially block-lists the Worker, and checks whether -- 2.30.2 From ac88d57ede8fa66886e53a5bb97e4d2f834b0a55 Mon Sep 17 00:00:00 2001 From: Nitin Rawat Date: Tue, 4 Apr 2023 10:01:44 +0530 Subject: [PATCH 3/6] We should also hard fail the task when numFailed == threshold --- internal/manager/api_impl/worker_task_updates.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/manager/api_impl/worker_task_updates.go b/internal/manager/api_impl/worker_task_updates.go index ffcd20b3..7f55ee81 100644 --- a/internal/manager/api_impl/worker_task_updates.go +++ b/internal/manager/api_impl/worker_task_updates.go @@ -188,7 +188,7 @@ func (f *Flamenco) onTaskFailed( Int("threshold", threshold). Logger() - if numFailed > threshold { + if numFailed >= threshold { return f.hardFailTask(ctx, logger, worker, task, numFailed) } -- 2.30.2 From 03533c1e49e90f46c97d4a4eed69a0c00c975034 Mon Sep 17 00:00:00 2001 From: Nitin Rawat Date: Tue, 4 Apr 2023 11:49:40 +0530 Subject: [PATCH 4/6] Tests for TaskUpdate needs to be updated. --- internal/manager/api_impl/worker_task_updates_test.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/internal/manager/api_impl/worker_task_updates_test.go b/internal/manager/api_impl/worker_task_updates_test.go index 637c649a..37747bda 100644 --- a/internal/manager/api_impl/worker_task_updates_test.go +++ b/internal/manager/api_impl/worker_task_updates_test.go @@ -134,6 +134,11 @@ func TestTaskUpdateFailed(t *testing.T) { // This returns 1, which is less than the failure threshold -> soft failure expected. mf.persistence.EXPECT().AddWorkerToTaskFailedList(gomock.Any(), &mockTask, &worker).Return(1, nil) + mf.persistence.EXPECT().WorkersLeftToRun(gomock.Any(), &mockJob, "misc"). + Return(map[string]bool{"60453eec-5a26-43e9-9da2-d00506d492cc": true, "ce312357-29cd-4389-81ab-4d43e30945f8": true}, nil) + mf.persistence.EXPECT().FetchTaskFailureList(gomock.Any(), &mockTask). + Return([]*persistence.Worker{ /* It shouldn't matter whether the failing worker is here or not. */ }, nil) + // Expect soft failure. mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), &mockTask, api.TaskStatusSoftFailed) mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), jobID, taskID, @@ -220,9 +225,9 @@ func TestBlockingAfterFailure(t *testing.T) { { // Mimick that there is another worker to work on this task, so the job should continue happily. mf.persistence.EXPECT().WorkersLeftToRun(gomock.Any(), &mockJob, "misc"). - Return(map[string]bool{"60453eec-5a26-43e9-9da2-d00506d492cc": true}, nil) + Return(map[string]bool{"60453eec-5a26-43e9-9da2-d00506d492cc": true, "ce312357-29cd-4389-81ab-4d43e30945f8": true}, nil).Times(2) mf.persistence.EXPECT().FetchTaskFailureList(gomock.Any(), &mockTask). - Return([]*persistence.Worker{ /* It shouldn't matter whether the failing worker is here or not. */ }, nil) + Return([]*persistence.Worker{ /* It shouldn't matter whether the failing worker is here or not. */ }, nil).Times(2) // Expect the Worker to be added to the list of failed workers for this task. // This returns 1, which is less than the failure threshold -> soft failure. -- 2.30.2 From ff0a36d19dfd02adad55478abbcc1a64e8741560 Mon Sep 17 00:00:00 2001 From: Nitin Rawat Date: Tue, 4 Apr 2023 12:54:34 +0530 Subject: [PATCH 5/6] Add test to check the job failure condition when number of workers available for the job is less than failure threshold. --- .../api_impl/worker_task_updates_test.go | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/internal/manager/api_impl/worker_task_updates_test.go b/internal/manager/api_impl/worker_task_updates_test.go index 37747bda..7f76091f 100644 --- a/internal/manager/api_impl/worker_task_updates_test.go +++ b/internal/manager/api_impl/worker_task_updates_test.go @@ -318,3 +318,68 @@ func TestBlockingAfterFailure(t *testing.T) { assertResponseNoContent(t, echoCtx) } } + +func TestJobFailureAfterWorkerTaskFailure(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mf := newMockedFlamenco(mockCtrl) + worker := testWorker() + + // Contruct the JSON request object + taskUpdate := api.TaskUpdateJSONRequestBody{ + TaskStatus: ptr(api.TaskStatusFailed), + } + + // Construct the task that's supposed to be updated. + taskID := "181eab68-1123-4790-93b1-94309a899411" + jobID := "e4719398-7cfa-4877-9bab-97c2d6c158b5" + mockJob := persistence.Job{UUID: jobID} + mockTask := persistence.Task{ + UUID: taskID, + Worker: &worker, + WorkerID: &worker.ID, + Job: &mockJob, + Activity: "pre-update activity", + Type: "misc", + } + + conf := config.Conf{ + Base: config.Base{ + TaskFailAfterSoftFailCount: 3, + BlocklistThreshold: 65535, // This test doesn't cover blocklisting. + }, + } + + mf.config.EXPECT().Get().Return(&conf).AnyTimes() + + mf.persistence.EXPECT().FetchTask(gomock.Any(), taskID).Return(&mockTask, nil) + + mf.persistence.EXPECT().TaskTouchedByWorker(gomock.Any(), &mockTask) + mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker) + + mf.persistence.EXPECT().CountTaskFailuresOfWorker(gomock.Any(), &mockJob, &worker, "misc").Return(0, nil) + + mf.persistence.EXPECT().AddWorkerToTaskFailedList(gomock.Any(), &mockTask, &worker).Return(1, nil) + + mf.persistence.EXPECT().WorkersLeftToRun(gomock.Any(), &mockJob, "misc"). + Return(map[string]bool{"60453eec-5a26-43e9-9da2-d00506d492cc": true}, nil) + mf.persistence.EXPECT().FetchTaskFailureList(gomock.Any(), &mockTask). + Return([]*persistence.Worker{ /* It shouldn't matter whether the failing worker is here or not. */ }, nil) + + // Expect hard failure of the task, because there are no workers left to perfom it. + mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), &mockTask, api.TaskStatusFailed) + mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), jobID, taskID, + "Task failed by worker дрон (e7632d62-c3b8-4af0-9e78-01752928952c), Manager will fail the entire job "+ + "as there are no more workers left for tasks of type \"misc\".") + + // Expect failure of the job. + mf.stateMachine.EXPECT(). + JobStatusChange(gomock.Any(), &mockJob, api.JobStatusFailed, "no more workers left to run tasks of type \"misc\"") + + echoCtx := mf.prepareMockedJSONRequest(taskUpdate) + requestWorkerStore(echoCtx, &worker) + err := mf.flamenco.TaskUpdate(echoCtx, taskID) + assert.NoError(t, err) + assertResponseNoContent(t, echoCtx) +} -- 2.30.2 From 1da9c33f223bf068a41b2a4020037d77e0994261 Mon Sep 17 00:00:00 2001 From: Nitin Rawat Date: Mon, 10 Apr 2023 11:21:14 +0530 Subject: [PATCH 6/6] WorkersLeftToRun should return the UUID of the test worker which is actually failing the task in the test. --- internal/manager/api_impl/worker_task_updates_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/manager/api_impl/worker_task_updates_test.go b/internal/manager/api_impl/worker_task_updates_test.go index 7f76091f..142ae0df 100644 --- a/internal/manager/api_impl/worker_task_updates_test.go +++ b/internal/manager/api_impl/worker_task_updates_test.go @@ -351,7 +351,7 @@ func TestJobFailureAfterWorkerTaskFailure(t *testing.T) { }, } - mf.config.EXPECT().Get().Return(&conf).AnyTimes() + mf.config.EXPECT().Get().Return(&conf).Times(2) mf.persistence.EXPECT().FetchTask(gomock.Any(), taskID).Return(&mockTask, nil) @@ -363,7 +363,7 @@ func TestJobFailureAfterWorkerTaskFailure(t *testing.T) { mf.persistence.EXPECT().AddWorkerToTaskFailedList(gomock.Any(), &mockTask, &worker).Return(1, nil) mf.persistence.EXPECT().WorkersLeftToRun(gomock.Any(), &mockJob, "misc"). - Return(map[string]bool{"60453eec-5a26-43e9-9da2-d00506d492cc": true}, nil) + Return(map[string]bool{"e7632d62-c3b8-4af0-9e78-01752928952c": true}, nil) mf.persistence.EXPECT().FetchTaskFailureList(gomock.Any(), &mockTask). Return([]*persistence.Worker{ /* It shouldn't matter whether the failing worker is here or not. */ }, nil) @@ -377,6 +377,7 @@ func TestJobFailureAfterWorkerTaskFailure(t *testing.T) { mf.stateMachine.EXPECT(). JobStatusChange(gomock.Any(), &mockJob, api.JobStatusFailed, "no more workers left to run tasks of type \"misc\"") + // Do the call echoCtx := mf.prepareMockedJSONRequest(taskUpdate) requestWorkerStore(echoCtx, &worker) err := mf.flamenco.TaskUpdate(echoCtx, taskID) -- 2.30.2