diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index fa893f8d..0ce1ecf8 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -208,11 +208,19 @@ func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.Au } deps[i] = depTask } - - dbTask.Dependencies = deps - subQuery := tx.Model(dbTask).Updates(Task{Dependencies: deps}) - if subQuery.Error != nil { - return taskError(subQuery.Error, "unable to store dependencies of task %q", authoredTask.UUID) + dependenciesbatchsize := 1000 + for j := 0; j < len(deps); j += dependenciesbatchsize { + end := j + dependenciesbatchsize + if end > len(deps) { + end = len(deps) + } + currentDeps := deps[j:end] + dbTask.Dependencies = currentDeps + tx.Model(&dbTask).Where("UUID = ?", dbTask.UUID) + subQuery := tx.Model(dbTask).Updates(Task{Dependencies: currentDeps}) + if subQuery.Error != nil { + return taskError(subQuery.Error, "error with storing dependencies of task %q issue exists in dependencies %d to %d", authoredTask.UUID, j, end) + } } } diff --git a/internal/manager/persistence/jobs_test.go b/internal/manager/persistence/jobs_test.go index e482fdec..6fb0cfd4 100644 --- a/internal/manager/persistence/jobs_test.go +++ b/internal/manager/persistence/jobs_test.go @@ -258,6 +258,21 @@ func TestCountTasksOfJobInStatus(t *testing.T) { assert.Equal(t, 3, numTotal) } +func TestCheckIfJobsHoldLargeNumOfTasks(t *testing.T) { + if testing.Short() { + t.Skip("Skipping test in short mode") + } + numtasks := 3500 + ctx, close, db, job, _ := jobTasksTestFixturesWithTaskNum(t, numtasks) + defer close() + + numQueued, numTotal, err := db.CountTasksOfJobInStatus(ctx, job, api.TaskStatusQueued) + assert.NoError(t, err) + assert.Equal(t, numtasks, numQueued) + assert.Equal(t, numtasks, numTotal) + +} + func TestFetchJobsInStatus(t *testing.T) { ctx, close, db, job1, _ := jobTasksTestFixtures(t) defer close() @@ -594,6 +609,36 @@ func createTestAuthoredJobWithTasks() job_compilers.AuthoredJob { return createTestAuthoredJob("263fd47e-b9f8-4637-b726-fd7e47ecfdae", task1, task2, task3) } +func createTestAuthoredJobWithNumTasks(numTasks int) job_compilers.AuthoredJob { + //Generates all of the render jobs + prevtasks := make([]*job_compilers.AuthoredTask, 0) + for i := 0; i < numTasks-1; i++ { + currtask := job_compilers.AuthoredTask{ + Name: "render-" + fmt.Sprintf("%d", i), + Type: "blender-render", + UUID: uuid.New(), + Commands: []job_compilers.AuthoredCommand{}, + } + prevtasks = append(prevtasks, &currtask) + } + // Generates the preview video command with Dependencies + videoJob := job_compilers.AuthoredTask{ + Name: "preview-video", + Type: "ffmpeg", + UUID: uuid.New(), + Commands: []job_compilers.AuthoredCommand{}, + Dependencies: prevtasks, + } + // convert pointers to values and generate job + taskvalues := make([]job_compilers.AuthoredTask, len(prevtasks)) + for i, ptr := range prevtasks { + taskvalues[i] = *ptr + } + taskvalues = append(taskvalues, videoJob) + return createTestAuthoredJob(uuid.New(), taskvalues...) + +} + func createTestAuthoredJob(jobID string, tasks ...job_compilers.AuthoredTask) job_compilers.AuthoredJob { job := job_compilers.AuthoredJob{ JobID: jobID, @@ -676,6 +721,16 @@ func jobTasksTestFixtures(t *testing.T) (context.Context, context.CancelFunc, *D return ctx, cancel, db, dbJob, authoredJob } +// This created Test Jobs using the new function createTestAuthoredJobWithNumTasks so that you can set the number of tasks +func jobTasksTestFixturesWithTaskNum(t *testing.T, numtasks int) (context.Context, context.CancelFunc, *DB, *Job, job_compilers.AuthoredJob) { + ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeoutlong) + + authoredJob := createTestAuthoredJobWithNumTasks(numtasks) + dbJob := persistAuthoredJob(t, ctx, db, authoredJob) + + return ctx, cancel, db, dbJob, authoredJob +} + func createWorker(ctx context.Context, t *testing.T, db *DB, updaters ...func(*Worker)) *Worker { w := Worker{ UUID: "f0a123a9-ab05-4ce2-8577-94802cfe74a4", diff --git a/internal/manager/persistence/task_scheduler_test.go b/internal/manager/persistence/task_scheduler_test.go index c0acfc95..90fe0b8f 100644 --- a/internal/manager/persistence/task_scheduler_test.go +++ b/internal/manager/persistence/task_scheduler_test.go @@ -16,6 +16,7 @@ import ( ) const schedulerTestTimeout = 100 * time.Millisecond +const schedulerTestTimeoutlong = 5000 * time.Millisecond func TestNoTasks(t *testing.T) { ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)