From fc1d1a79cdfce5441aa94ac1e30aaa59061fd77a Mon Sep 17 00:00:00 2001 From: "Anish Bharadwaj (he)" Date: Mon, 3 Apr 2023 19:32:43 -0700 Subject: [PATCH 1/3] Update jobs.go Setup Dependency update to run sql command in batches, allowing for larger task sizes in Flamenco Renders --- internal/manager/persistence/jobs.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index fa893f8d..0ce1ecf8 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -208,11 +208,19 @@ func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.Au } deps[i] = depTask } - - dbTask.Dependencies = deps - subQuery := tx.Model(dbTask).Updates(Task{Dependencies: deps}) - if subQuery.Error != nil { - return taskError(subQuery.Error, "unable to store dependencies of task %q", authoredTask.UUID) + dependenciesbatchsize := 1000 + for j := 0; j < len(deps); j += dependenciesbatchsize { + end := j + dependenciesbatchsize + if end > len(deps) { + end = len(deps) + } + currentDeps := deps[j:end] + dbTask.Dependencies = currentDeps + tx.Model(&dbTask).Where("UUID = ?", dbTask.UUID) + subQuery := tx.Model(dbTask).Updates(Task{Dependencies: currentDeps}) + if subQuery.Error != nil { + return taskError(subQuery.Error, "error with storing dependencies of task %q issue exists in dependencies %d to %d", authoredTask.UUID, j, end) + } } } -- 2.30.2 From f8a9e009bbca245204d6e7bd6f7fd35095dbe2dc Mon Sep 17 00:00:00 2001 From: "Anish Bharadwaj (he)" Date: Wed, 12 Apr 2023 04:23:18 -0700 Subject: [PATCH 2/3] added unit test that can generate jobs with specified number of tasks --- internal/manager/persistence/jobs_test.go | 53 +++++++++++++++++++ .../persistence/task_scheduler_test.go | 1 + 2 files changed, 54 insertions(+) diff --git a/internal/manager/persistence/jobs_test.go b/internal/manager/persistence/jobs_test.go index e482fdec..043ee984 100644 --- a/internal/manager/persistence/jobs_test.go +++ b/internal/manager/persistence/jobs_test.go @@ -258,6 +258,18 @@ func TestCountTasksOfJobInStatus(t *testing.T) { assert.Equal(t, 3, numTotal) } +func TestCheckIfJobsHoldLargeNumOfTasks(t *testing.T) { + numtasks := 3500 + ctx, close, db, job, _ := jobTasksTestFixturesWithTaskNum(t, numtasks) + defer close() + + numQueued, numTotal, err := db.CountTasksOfJobInStatus(ctx, job, api.TaskStatusQueued) + assert.NoError(t, err) + assert.Equal(t, numtasks, numQueued) + assert.Equal(t, numtasks, numTotal) + +} + func TestFetchJobsInStatus(t *testing.T) { ctx, close, db, job1, _ := jobTasksTestFixtures(t) defer close() @@ -594,6 +606,37 @@ func createTestAuthoredJobWithTasks() job_compilers.AuthoredJob { return createTestAuthoredJob("263fd47e-b9f8-4637-b726-fd7e47ecfdae", task1, task2, task3) } +//Create a Job with a Specified number of tasks +func createTestAuthoredJobWithNumTasks(numTasks int) job_compilers.AuthoredJob { + //Generates all of the render jobs + prevtasks := make([]*job_compilers.AuthoredTask, 0) + for i := 0; i < numTasks-1; i++ { + currtask := job_compilers.AuthoredTask{ + Name: "render-" + fmt.Sprintf("%d", i), + Type: "ffmpeg", + UUID: uuid.New(), + Commands: []job_compilers.AuthoredCommand{}, + } + prevtasks = append(prevtasks, &currtask) + } + // Generates the preview video command with Dependencies + videoJob := job_compilers.AuthoredTask{ + Name: "preview-video", + Type: "ffmpeg", + UUID: uuid.New(), + Commands: []job_compilers.AuthoredCommand{}, + Dependencies: prevtasks, + } + // convert pointers to values and generate job + taskvalues := make([]job_compilers.AuthoredTask, len(prevtasks)) + for i, ptr := range prevtasks { + taskvalues[i] = *ptr + } + taskvalues = append(taskvalues, videoJob) + return createTestAuthoredJob(uuid.New(), taskvalues...) + +} + func createTestAuthoredJob(jobID string, tasks ...job_compilers.AuthoredTask) job_compilers.AuthoredJob { job := job_compilers.AuthoredJob{ JobID: jobID, @@ -676,6 +719,16 @@ func jobTasksTestFixtures(t *testing.T) (context.Context, context.CancelFunc, *D return ctx, cancel, db, dbJob, authoredJob } +//This created Test Jobs using the new function createTestAuthoredJobWithNumTasks so that you can set the number of tasks +func jobTasksTestFixturesWithTaskNum(t *testing.T, numtasks int) (context.Context, context.CancelFunc, *DB, *Job, job_compilers.AuthoredJob) { + ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeoutlong) + + authoredJob := createTestAuthoredJobWithNumTasks(numtasks) + dbJob := persistAuthoredJob(t, ctx, db, authoredJob) + + return ctx, cancel, db, dbJob, authoredJob +} + func createWorker(ctx context.Context, t *testing.T, db *DB, updaters ...func(*Worker)) *Worker { w := Worker{ UUID: "f0a123a9-ab05-4ce2-8577-94802cfe74a4", diff --git a/internal/manager/persistence/task_scheduler_test.go b/internal/manager/persistence/task_scheduler_test.go index c0acfc95..90fe0b8f 100644 --- a/internal/manager/persistence/task_scheduler_test.go +++ b/internal/manager/persistence/task_scheduler_test.go @@ -16,6 +16,7 @@ import ( ) const schedulerTestTimeout = 100 * time.Millisecond +const schedulerTestTimeoutlong = 5000 * time.Millisecond func TestNoTasks(t *testing.T) { ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout) -- 2.30.2 From 140115f8206b479971e645dd4533b967513d4ddc Mon Sep 17 00:00:00 2001 From: "Anish Bharadwaj (he)" Date: Mon, 17 Apr 2023 15:29:00 -0700 Subject: [PATCH 3/3] set TestCheckIfJobsHoldLargeNumberOfTasks to skip on -short tag --- internal/manager/persistence/jobs_test.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/internal/manager/persistence/jobs_test.go b/internal/manager/persistence/jobs_test.go index 043ee984..6fb0cfd4 100644 --- a/internal/manager/persistence/jobs_test.go +++ b/internal/manager/persistence/jobs_test.go @@ -259,6 +259,9 @@ func TestCountTasksOfJobInStatus(t *testing.T) { } func TestCheckIfJobsHoldLargeNumOfTasks(t *testing.T) { + if testing.Short() { + t.Skip("Skipping test in short mode") + } numtasks := 3500 ctx, close, db, job, _ := jobTasksTestFixturesWithTaskNum(t, numtasks) defer close() @@ -606,14 +609,13 @@ func createTestAuthoredJobWithTasks() job_compilers.AuthoredJob { return createTestAuthoredJob("263fd47e-b9f8-4637-b726-fd7e47ecfdae", task1, task2, task3) } -//Create a Job with a Specified number of tasks func createTestAuthoredJobWithNumTasks(numTasks int) job_compilers.AuthoredJob { //Generates all of the render jobs prevtasks := make([]*job_compilers.AuthoredTask, 0) for i := 0; i < numTasks-1; i++ { currtask := job_compilers.AuthoredTask{ Name: "render-" + fmt.Sprintf("%d", i), - Type: "ffmpeg", + Type: "blender-render", UUID: uuid.New(), Commands: []job_compilers.AuthoredCommand{}, } @@ -719,7 +721,7 @@ func jobTasksTestFixtures(t *testing.T) (context.Context, context.CancelFunc, *D return ctx, cancel, db, dbJob, authoredJob } -//This created Test Jobs using the new function createTestAuthoredJobWithNumTasks so that you can set the number of tasks +// This created Test Jobs using the new function createTestAuthoredJobWithNumTasks so that you can set the number of tasks func jobTasksTestFixturesWithTaskNum(t *testing.T, numtasks int) (context.Context, context.CancelFunc, *DB, *Job, job_compilers.AuthoredJob) { ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeoutlong) -- 2.30.2