WIP: 103268-job-task-progress #104185

Draft
Nitin-Rawat-1 wants to merge 19 commits from Nitin-Rawat-1/flamenco:103268-job-task-progress into main

When changing the target branch, be careful to rebase the branch in your fork to match. See documentation.
3 changed files with 97 additions and 5 deletions
Showing only changes of commit 779cb9cab2 - Show all commits

View File

@ -187,10 +187,25 @@ func (f *Flamenco) onTaskFailed(
Int("failedByWorkerCount", numFailed). Int("failedByWorkerCount", numFailed).
Int("threshold", threshold). Int("threshold", threshold).
Logger() Logger()
if numFailed < threshold {
return f.softFailTask(ctx, logger, worker, task, numFailed) if numFailed >= threshold {
return f.hardFailTask(ctx, logger, worker, task, numFailed)
} }
return f.hardFailTask(ctx, logger, worker, task, numFailed)
numWorkers, err := f.numWorkersCapableOfRunningTask(ctx, task)
if err != nil {
return err
}
// If number of workers capable of running the failed task again is "1",
// that means we have no worker besides the one that actually failed the task.
// Because at this point in code the worker hasn't been registered as failing this task yet,
// and thus it is still counted.
// In such condition we should just fail the job itself.
if numWorkers <= 1 {
return f.failJobAfterCatastroficTaskFailure(ctx, logger, worker, task)
}
return f.softFailTask(ctx, logger, worker, task, numFailed)
} }
// maybeBlocklistWorker potentially block-lists the Worker, and checks whether // maybeBlocklistWorker potentially block-lists the Worker, and checks whether

View File

@ -134,6 +134,11 @@ func TestTaskUpdateFailed(t *testing.T) {
// This returns 1, which is less than the failure threshold -> soft failure expected. // This returns 1, which is less than the failure threshold -> soft failure expected.
mf.persistence.EXPECT().AddWorkerToTaskFailedList(gomock.Any(), &mockTask, &worker).Return(1, nil) mf.persistence.EXPECT().AddWorkerToTaskFailedList(gomock.Any(), &mockTask, &worker).Return(1, nil)
mf.persistence.EXPECT().WorkersLeftToRun(gomock.Any(), &mockJob, "misc").
Return(map[string]bool{"60453eec-5a26-43e9-9da2-d00506d492cc": true, "ce312357-29cd-4389-81ab-4d43e30945f8": true}, nil)
mf.persistence.EXPECT().FetchTaskFailureList(gomock.Any(), &mockTask).
Return([]*persistence.Worker{ /* It shouldn't matter whether the failing worker is here or not. */ }, nil)
// Expect soft failure. // Expect soft failure.
mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), &mockTask, api.TaskStatusSoftFailed) mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), &mockTask, api.TaskStatusSoftFailed)
mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), jobID, taskID, mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), jobID, taskID,
@ -220,9 +225,9 @@ func TestBlockingAfterFailure(t *testing.T) {
{ {
// Mimick that there is another worker to work on this task, so the job should continue happily. // Mimick that there is another worker to work on this task, so the job should continue happily.
mf.persistence.EXPECT().WorkersLeftToRun(gomock.Any(), &mockJob, "misc"). mf.persistence.EXPECT().WorkersLeftToRun(gomock.Any(), &mockJob, "misc").
Return(map[string]bool{"60453eec-5a26-43e9-9da2-d00506d492cc": true}, nil) Return(map[string]bool{"60453eec-5a26-43e9-9da2-d00506d492cc": true, "ce312357-29cd-4389-81ab-4d43e30945f8": true}, nil).Times(2)
mf.persistence.EXPECT().FetchTaskFailureList(gomock.Any(), &mockTask). mf.persistence.EXPECT().FetchTaskFailureList(gomock.Any(), &mockTask).
Return([]*persistence.Worker{ /* It shouldn't matter whether the failing worker is here or not. */ }, nil) Return([]*persistence.Worker{ /* It shouldn't matter whether the failing worker is here or not. */ }, nil).Times(2)
// Expect the Worker to be added to the list of failed workers for this task. // Expect the Worker to be added to the list of failed workers for this task.
// This returns 1, which is less than the failure threshold -> soft failure. // This returns 1, which is less than the failure threshold -> soft failure.
@ -313,3 +318,69 @@ func TestBlockingAfterFailure(t *testing.T) {
assertResponseNoContent(t, echoCtx) assertResponseNoContent(t, echoCtx)
} }
} }
func TestJobFailureAfterWorkerTaskFailure(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mf := newMockedFlamenco(mockCtrl)
worker := testWorker()
// Contruct the JSON request object
taskUpdate := api.TaskUpdateJSONRequestBody{
TaskStatus: ptr(api.TaskStatusFailed),
}
// Construct the task that's supposed to be updated.
taskID := "181eab68-1123-4790-93b1-94309a899411"
jobID := "e4719398-7cfa-4877-9bab-97c2d6c158b5"
mockJob := persistence.Job{UUID: jobID}
mockTask := persistence.Task{
UUID: taskID,
Worker: &worker,
WorkerID: &worker.ID,
Job: &mockJob,
Activity: "pre-update activity",
Type: "misc",
}
conf := config.Conf{
Base: config.Base{
TaskFailAfterSoftFailCount: 3,
BlocklistThreshold: 65535, // This test doesn't cover blocklisting.
},
}
mf.config.EXPECT().Get().Return(&conf).Times(2)
mf.persistence.EXPECT().FetchTask(gomock.Any(), taskID).Return(&mockTask, nil)
mf.persistence.EXPECT().TaskTouchedByWorker(gomock.Any(), &mockTask)
mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker)
mf.persistence.EXPECT().CountTaskFailuresOfWorker(gomock.Any(), &mockJob, &worker, "misc").Return(0, nil)
mf.persistence.EXPECT().AddWorkerToTaskFailedList(gomock.Any(), &mockTask, &worker).Return(1, nil)
mf.persistence.EXPECT().WorkersLeftToRun(gomock.Any(), &mockJob, "misc").
Return(map[string]bool{"e7632d62-c3b8-4af0-9e78-01752928952c": true}, nil)
mf.persistence.EXPECT().FetchTaskFailureList(gomock.Any(), &mockTask).
Return([]*persistence.Worker{ /* It shouldn't matter whether the failing worker is here or not. */ }, nil)
// Expect hard failure of the task, because there are no workers left to perfom it.
mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), &mockTask, api.TaskStatusFailed)
mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), jobID, taskID,
"Task failed by worker дрон (e7632d62-c3b8-4af0-9e78-01752928952c), Manager will fail the entire job "+
"as there are no more workers left for tasks of type \"misc\".")
// Expect failure of the job.
mf.stateMachine.EXPECT().
JobStatusChange(gomock.Any(), &mockJob, api.JobStatusFailed, "no more workers left to run tasks of type \"misc\"")
// Do the call
echoCtx := mf.prepareMockedJSONRequest(taskUpdate)
requestWorkerStore(echoCtx, &worker)
err := mf.flamenco.TaskUpdate(echoCtx, taskID)
assert.NoError(t, err)
assertResponseNoContent(t, echoCtx)
}

View File

@ -15,6 +15,12 @@ Basically there are three approaches to this:
Each is explained below. Each is explained below.
{{< hint type=Warning >}}
On Windows, Flamenco **only supports drive letters** to indicate locations.
Flamenco does **not** support UNC notation like `\\SERVER\share`. Mount the
network share to a drive letter. The examples below use `S:` for this.
{{< /hint >}}
## Work Directly on the Shared Storage ## Work Directly on the Shared Storage
Working directly in the shared storage is the simplest way to work with Working directly in the shared storage is the simplest way to work with