Support pausing jobs #104313
2
go.mod
2
go.mod
@ -1,6 +1,6 @@
|
|||||||
module projects.blender.org/studio/flamenco
|
module projects.blender.org/studio/flamenco
|
||||||
|
|
||||||
go 1.22.3
|
go 1.22.4
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/adrg/xdg v0.4.0
|
github.com/adrg/xdg v0.4.0
|
||||||
|
@ -4,8 +4,10 @@ package persistence
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"errors"
|
||||||
|
|
||||||
"gorm.io/gorm/clause"
|
"projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc"
|
||||||
)
|
)
|
||||||
|
|
||||||
// LastRendered only has one entry in its database table, to indicate the job
|
// LastRendered only has one entry in its database table, to indicate the job
|
||||||
@ -19,30 +21,32 @@ type LastRendered struct {
|
|||||||
|
|
||||||
// SetLastRendered sets this job as the one with the most recent rendered image.
|
// SetLastRendered sets this job as the one with the most recent rendered image.
|
||||||
func (db *DB) SetLastRendered(ctx context.Context, j *Job) error {
|
func (db *DB) SetLastRendered(ctx context.Context, j *Job) error {
|
||||||
render := LastRendered{
|
queries, err := db.queries()
|
||||||
// Always use the same database ID to ensure a single entry.
|
if err != nil {
|
||||||
Model: Model{ID: uint(1)},
|
return err
|
||||||
|
|
||||||
JobID: j.ID,
|
|
||||||
Job: j,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tx := db.gormDB.
|
now := db.now()
|
||||||
WithContext(ctx).
|
return queries.SetLastRendered(ctx, sqlc.SetLastRenderedParams{
|
||||||
Clauses(clause.OnConflict{UpdateAll: true}).
|
CreatedAt: now.Time,
|
||||||
Create(&render)
|
UpdatedAt: now,
|
||||||
return tx.Error
|
JobID: int64(j.ID),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetLastRendered returns the UUID of the job with the most recent rendered image.
|
// GetLastRendered returns the UUID of the job with the most recent rendered image.
|
||||||
func (db *DB) GetLastRenderedJobUUID(ctx context.Context) (string, error) {
|
func (db *DB) GetLastRenderedJobUUID(ctx context.Context) (string, error) {
|
||||||
job := Job{}
|
queries, err := db.queries()
|
||||||
tx := db.gormDB.WithContext(ctx).
|
if err != nil {
|
||||||
Joins("inner join last_rendereds LR on jobs.id = LR.job_id").
|
return "", err
|
||||||
Select("uuid").
|
|
||||||
Find(&job)
|
|
||||||
if tx.Error != nil {
|
|
||||||
return "", jobError(tx.Error, "finding job with most rencent render")
|
|
||||||
}
|
}
|
||||||
return job.UUID, nil
|
|
||||||
|
jobUUID, err := queries.GetLastRenderedJobUUID(ctx)
|
||||||
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return "", jobError(err, "finding job with most rencent render")
|
||||||
|
}
|
||||||
|
return jobUUID, nil
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,4 @@
|
|||||||
|
|
||||||
-- Jobs / Tasks queries
|
|
||||||
--
|
|
||||||
|
|
||||||
-- name: CreateJob :exec
|
-- name: CreateJob :exec
|
||||||
INSERT INTO jobs (
|
INSERT INTO jobs (
|
||||||
created_at,
|
created_at,
|
||||||
@ -190,3 +187,19 @@ WHERE task_id in (SELECT id FROM tasks WHERE job_id=@job_id);
|
|||||||
SELECT sqlc.embed(workers) FROM workers
|
SELECT sqlc.embed(workers) FROM workers
|
||||||
INNER JOIN task_failures TF on TF.worker_id=workers.id
|
INNER JOIN task_failures TF on TF.worker_id=workers.id
|
||||||
WHERE TF.task_id=@task_id;
|
WHERE TF.task_id=@task_id;
|
||||||
|
|
||||||
|
-- name: SetLastRendered :exec
|
||||||
|
-- Set the 'last rendered' job info.
|
||||||
|
--
|
||||||
|
-- Note that the use of ?2 and ?3 in the SQL is not desirable, and should be
|
||||||
|
-- replaced with @updated_at and @job_id as soon as sqlc issue #3334 is fixed.
|
||||||
|
-- See https://github.com/sqlc-dev/sqlc/issues/3334 for more info.
|
||||||
|
INSERT INTO last_rendereds (id, created_at, updated_at, job_id)
|
||||||
|
VALUES (1, @created_at, @updated_at, @job_id)
|
||||||
|
ON CONFLICT DO UPDATE
|
||||||
|
SET updated_at=?2, job_id=?3
|
||||||
|
WHERE id=1;
|
||||||
|
|
||||||
|
-- name: GetLastRenderedJobUUID :one
|
||||||
|
SELECT uuid FROM jobs
|
||||||
|
INNER JOIN last_rendereds LR ON jobs.id = LR.job_id;
|
||||||
|
@ -64,7 +64,6 @@ func (q *Queries) CountWorkersFailingTask(ctx context.Context, taskID int64) (in
|
|||||||
}
|
}
|
||||||
|
|
||||||
const createJob = `-- name: CreateJob :exec
|
const createJob = `-- name: CreateJob :exec
|
||||||
|
|
||||||
INSERT INTO jobs (
|
INSERT INTO jobs (
|
||||||
created_at,
|
created_at,
|
||||||
uuid,
|
uuid,
|
||||||
@ -93,7 +92,6 @@ type CreateJobParams struct {
|
|||||||
StorageShamanCheckoutID string
|
StorageShamanCheckoutID string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Jobs / Tasks queries
|
|
||||||
func (q *Queries) CreateJob(ctx context.Context, arg CreateJobParams) error {
|
func (q *Queries) CreateJob(ctx context.Context, arg CreateJobParams) error {
|
||||||
_, err := q.db.ExecContext(ctx, createJob,
|
_, err := q.db.ExecContext(ctx, createJob,
|
||||||
arg.CreatedAt,
|
arg.CreatedAt,
|
||||||
@ -623,6 +621,18 @@ func (q *Queries) FetchTasksOfWorkerInStatusOfJob(ctx context.Context, arg Fetch
|
|||||||
return items, nil
|
return items, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const getLastRenderedJobUUID = `-- name: GetLastRenderedJobUUID :one
|
||||||
|
SELECT uuid FROM jobs
|
||||||
|
INNER JOIN last_rendereds LR ON jobs.id = LR.job_id
|
||||||
|
`
|
||||||
|
|
||||||
|
func (q *Queries) GetLastRenderedJobUUID(ctx context.Context) (string, error) {
|
||||||
|
row := q.db.QueryRowContext(ctx, getLastRenderedJobUUID)
|
||||||
|
var uuid string
|
||||||
|
err := row.Scan(&uuid)
|
||||||
|
return uuid, err
|
||||||
|
}
|
||||||
|
|
||||||
const jobCountTaskStatuses = `-- name: JobCountTaskStatuses :many
|
const jobCountTaskStatuses = `-- name: JobCountTaskStatuses :many
|
||||||
SELECT status, count(*) as num_tasks FROM tasks
|
SELECT status, count(*) as num_tasks FROM tasks
|
||||||
WHERE job_id = ?1
|
WHERE job_id = ?1
|
||||||
@ -771,6 +781,30 @@ func (q *Queries) SaveJobStorageInfo(ctx context.Context, arg SaveJobStorageInfo
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const setLastRendered = `-- name: SetLastRendered :exec
|
||||||
|
INSERT INTO last_rendereds (id, created_at, updated_at, job_id)
|
||||||
|
VALUES (1, ?1, ?2, ?3)
|
||||||
|
ON CONFLICT DO UPDATE
|
||||||
|
SET updated_at=?2, job_id=?3
|
||||||
|
WHERE id=1
|
||||||
|
`
|
||||||
|
|
||||||
|
type SetLastRenderedParams struct {
|
||||||
|
CreatedAt time.Time
|
||||||
|
UpdatedAt sql.NullTime
|
||||||
|
JobID int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the 'last rendered' job info.
|
||||||
|
//
|
||||||
|
// Note that the use of ?2 and ?3 in the SQL is not desirable, and should be
|
||||||
|
// replaced with @updated_at and @job_id as soon as sqlc issue #3334 is fixed.
|
||||||
|
// See https://github.com/sqlc-dev/sqlc/issues/3334 for more info.
|
||||||
|
func (q *Queries) SetLastRendered(ctx context.Context, arg SetLastRenderedParams) error {
|
||||||
|
_, err := q.db.ExecContext(ctx, setLastRendered, arg.CreatedAt, arg.UpdatedAt, arg.JobID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
const taskAssignToWorker = `-- name: TaskAssignToWorker :exec
|
const taskAssignToWorker = `-- name: TaskAssignToWorker :exec
|
||||||
UPDATE tasks SET
|
UPDATE tasks SET
|
||||||
updated_at = ?1,
|
updated_at = ?1,
|
||||||
|
@ -1,7 +1,4 @@
|
|||||||
|
|
||||||
-- Worker queries
|
|
||||||
--
|
|
||||||
|
|
||||||
-- name: CreateWorker :one
|
-- name: CreateWorker :one
|
||||||
INSERT INTO workers (
|
INSERT INTO workers (
|
||||||
created_at,
|
created_at,
|
||||||
|
@ -27,7 +27,6 @@ func (q *Queries) AddWorkerTagMembership(ctx context.Context, arg AddWorkerTagMe
|
|||||||
}
|
}
|
||||||
|
|
||||||
const createWorker = `-- name: CreateWorker :one
|
const createWorker = `-- name: CreateWorker :one
|
||||||
|
|
||||||
INSERT INTO workers (
|
INSERT INTO workers (
|
||||||
created_at,
|
created_at,
|
||||||
uuid,
|
uuid,
|
||||||
@ -79,8 +78,6 @@ type CreateWorkerParams struct {
|
|||||||
CanRestart bool
|
CanRestart bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Worker queries
|
|
||||||
//
|
|
||||||
func (q *Queries) CreateWorker(ctx context.Context, arg CreateWorkerParams) (int64, error) {
|
func (q *Queries) CreateWorker(ctx context.Context, arg CreateWorkerParams) (int64, error) {
|
||||||
row := q.db.QueryRowContext(ctx, createWorker,
|
row := q.db.QueryRowContext(ctx, createWorker,
|
||||||
arg.CreatedAt,
|
arg.CreatedAt,
|
||||||
|
@ -224,7 +224,7 @@ func TestTaskStatusChangeCancelSingleTaskWithOtherFailed(t *testing.T) {
|
|||||||
mocks.expectSaveJobWithStatus(t, job, api.JobStatusCanceled)
|
mocks.expectSaveJobWithStatus(t, job, api.JobStatusCanceled)
|
||||||
mocks.expectBroadcastJobChange(task1.Job, api.JobStatusCancelRequested, api.JobStatusCanceled)
|
mocks.expectBroadcastJobChange(task1.Job, api.JobStatusCancelRequested, api.JobStatusCanceled)
|
||||||
|
|
||||||
// The paused task just stays paused, so don't expectBroadcastTaskChange(task3).
|
// The canceled task just stays canceled, so don't expectBroadcastTaskChange(task3).
|
||||||
|
|
||||||
require.NoError(t, sm.TaskStatusChange(ctx, task1, api.TaskStatusCanceled))
|
require.NoError(t, sm.TaskStatusChange(ctx, task1, api.TaskStatusCanceled))
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user