Resolved Task Limit error in Flamenco Manager #104201 #104205
@ -208,11 +208,19 @@ func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.Au
|
|||||||
}
|
}
|
||||||
deps[i] = depTask
|
deps[i] = depTask
|
||||||
}
|
}
|
||||||
|
dependenciesbatchsize := 1000
|
||||||
dbTask.Dependencies = deps
|
for j := 0; j < len(deps); j += dependenciesbatchsize {
|
||||||
subQuery := tx.Model(dbTask).Updates(Task{Dependencies: deps})
|
end := j + dependenciesbatchsize
|
||||||
|
if end > len(deps) {
|
||||||
|
end = len(deps)
|
||||||
|
}
|
||||||
|
currentDeps := deps[j:end]
|
||||||
|
dbTask.Dependencies = currentDeps
|
||||||
|
tx.Model(&dbTask).Where("UUID = ?", dbTask.UUID)
|
||||||
|
subQuery := tx.Model(dbTask).Updates(Task{Dependencies: currentDeps})
|
||||||
if subQuery.Error != nil {
|
if subQuery.Error != nil {
|
||||||
return taskError(subQuery.Error, "unable to store dependencies of task %q", authoredTask.UUID)
|
return taskError(subQuery.Error, "error with storing dependencies of task %q issue exists in dependencies %d to %d", authoredTask.UUID, j, end)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -258,6 +258,21 @@ func TestCountTasksOfJobInStatus(t *testing.T) {
|
|||||||
assert.Equal(t, 3, numTotal)
|
assert.Equal(t, 3, numTotal)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCheckIfJobsHoldLargeNumOfTasks(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping test in short mode")
|
||||||
|
}
|
||||||
|
numtasks := 3500
|
||||||
|
ctx, close, db, job, _ := jobTasksTestFixturesWithTaskNum(t, numtasks)
|
||||||
|
defer close()
|
||||||
|
|
||||||
|
numQueued, numTotal, err := db.CountTasksOfJobInStatus(ctx, job, api.TaskStatusQueued)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, numtasks, numQueued)
|
||||||
|
assert.Equal(t, numtasks, numTotal)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func TestFetchJobsInStatus(t *testing.T) {
|
func TestFetchJobsInStatus(t *testing.T) {
|
||||||
ctx, close, db, job1, _ := jobTasksTestFixtures(t)
|
ctx, close, db, job1, _ := jobTasksTestFixtures(t)
|
||||||
defer close()
|
defer close()
|
||||||
@ -594,6 +609,36 @@ func createTestAuthoredJobWithTasks() job_compilers.AuthoredJob {
|
|||||||
return createTestAuthoredJob("263fd47e-b9f8-4637-b726-fd7e47ecfdae", task1, task2, task3)
|
return createTestAuthoredJob("263fd47e-b9f8-4637-b726-fd7e47ecfdae", task1, task2, task3)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func createTestAuthoredJobWithNumTasks(numTasks int) job_compilers.AuthoredJob {
|
||||||
|
//Generates all of the render jobs
|
||||||
|
prevtasks := make([]*job_compilers.AuthoredTask, 0)
|
||||||
|
for i := 0; i < numTasks-1; i++ {
|
||||||
|
currtask := job_compilers.AuthoredTask{
|
||||||
|
Name: "render-" + fmt.Sprintf("%d", i),
|
||||||
|
Type: "blender-render",
|
||||||
|
UUID: uuid.New(),
|
||||||
|
Commands: []job_compilers.AuthoredCommand{},
|
||||||
|
}
|
||||||
|
prevtasks = append(prevtasks, &currtask)
|
||||||
|
}
|
||||||
|
// Generates the preview video command with Dependencies
|
||||||
|
videoJob := job_compilers.AuthoredTask{
|
||||||
|
Name: "preview-video",
|
||||||
|
Type: "ffmpeg",
|
||||||
|
UUID: uuid.New(),
|
||||||
|
Commands: []job_compilers.AuthoredCommand{},
|
||||||
|
Dependencies: prevtasks,
|
||||||
|
}
|
||||||
|
// convert pointers to values and generate job
|
||||||
|
taskvalues := make([]job_compilers.AuthoredTask, len(prevtasks))
|
||||||
|
for i, ptr := range prevtasks {
|
||||||
|
taskvalues[i] = *ptr
|
||||||
|
}
|
||||||
|
taskvalues = append(taskvalues, videoJob)
|
||||||
|
return createTestAuthoredJob(uuid.New(), taskvalues...)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func createTestAuthoredJob(jobID string, tasks ...job_compilers.AuthoredTask) job_compilers.AuthoredJob {
|
func createTestAuthoredJob(jobID string, tasks ...job_compilers.AuthoredTask) job_compilers.AuthoredJob {
|
||||||
job := job_compilers.AuthoredJob{
|
job := job_compilers.AuthoredJob{
|
||||||
JobID: jobID,
|
JobID: jobID,
|
||||||
@ -676,6 +721,16 @@ func jobTasksTestFixtures(t *testing.T) (context.Context, context.CancelFunc, *D
|
|||||||
return ctx, cancel, db, dbJob, authoredJob
|
return ctx, cancel, db, dbJob, authoredJob
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This created Test Jobs using the new function createTestAuthoredJobWithNumTasks so that you can set the number of tasks
|
||||||
|
func jobTasksTestFixturesWithTaskNum(t *testing.T, numtasks int) (context.Context, context.CancelFunc, *DB, *Job, job_compilers.AuthoredJob) {
|
||||||
|
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeoutlong)
|
||||||
|
|
||||||
|
authoredJob := createTestAuthoredJobWithNumTasks(numtasks)
|
||||||
|
dbJob := persistAuthoredJob(t, ctx, db, authoredJob)
|
||||||
|
|
||||||
|
return ctx, cancel, db, dbJob, authoredJob
|
||||||
|
}
|
||||||
|
|
||||||
func createWorker(ctx context.Context, t *testing.T, db *DB, updaters ...func(*Worker)) *Worker {
|
func createWorker(ctx context.Context, t *testing.T, db *DB, updaters ...func(*Worker)) *Worker {
|
||||||
w := Worker{
|
w := Worker{
|
||||||
UUID: "f0a123a9-ab05-4ce2-8577-94802cfe74a4",
|
UUID: "f0a123a9-ab05-4ce2-8577-94802cfe74a4",
|
||||||
|
@ -16,6 +16,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const schedulerTestTimeout = 100 * time.Millisecond
|
const schedulerTestTimeout = 100 * time.Millisecond
|
||||||
|
const schedulerTestTimeoutlong = 5000 * time.Millisecond
|
||||||
|
|
||||||
func TestNoTasks(t *testing.T) {
|
func TestNoTasks(t *testing.T) {
|
||||||
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)
|
ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)
|
||||||
|
Loading…
Reference in New Issue
Block a user