WIP: convert GORM to sqlc, for jobs/tasks #104304
@ -36,6 +36,7 @@ type PersistenceService interface {
|
|||||||
SaveJobPriority(ctx context.Context, job *persistence.Job) error
|
SaveJobPriority(ctx context.Context, job *persistence.Job) error
|
||||||
// FetchTask fetches the given task and the accompanying job.
|
// FetchTask fetches the given task and the accompanying job.
|
||||||
FetchTask(ctx context.Context, taskID string) (*persistence.Task, error)
|
FetchTask(ctx context.Context, taskID string) (*persistence.Task, error)
|
||||||
|
FetchTaskWithJobUUID(ctx context.Context, taskID string) (task *persistence.Task, jobUUID string, err error)
|
||||||
FetchTaskFailureList(context.Context, *persistence.Task) ([]*persistence.Worker, error)
|
FetchTaskFailureList(context.Context, *persistence.Task) ([]*persistence.Worker, error)
|
||||||
SaveTask(ctx context.Context, task *persistence.Task) error
|
SaveTask(ctx context.Context, task *persistence.Task) error
|
||||||
SaveTaskActivity(ctx context.Context, t *persistence.Task) error
|
SaveTaskActivity(ctx context.Context, t *persistence.Task) error
|
||||||
|
@ -439,7 +439,7 @@ func (f *Flamenco) FetchTaskLogInfo(e echo.Context, taskID string) error {
|
|||||||
return sendAPIError(e, http.StatusBadRequest, "bad task ID")
|
return sendAPIError(e, http.StatusBadRequest, "bad task ID")
|
||||||
}
|
}
|
||||||
|
|
||||||
dbTask, err := f.persist.FetchTask(ctx, taskID)
|
dbTask, jobUUID, err := f.persist.FetchTaskWithJobUUID(ctx, taskID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, persistence.ErrTaskNotFound) {
|
if errors.Is(err, persistence.ErrTaskNotFound) {
|
||||||
return sendAPIError(e, http.StatusNotFound, "no such task")
|
return sendAPIError(e, http.StatusNotFound, "no such task")
|
||||||
@ -447,9 +447,9 @@ func (f *Flamenco) FetchTaskLogInfo(e echo.Context, taskID string) error {
|
|||||||
logger.Error().Err(err).Msg("error fetching task")
|
logger.Error().Err(err).Msg("error fetching task")
|
||||||
return sendAPIError(e, http.StatusInternalServerError, "error fetching task: %v", err)
|
return sendAPIError(e, http.StatusInternalServerError, "error fetching task: %v", err)
|
||||||
}
|
}
|
||||||
logger = logger.With().Str("job", dbTask.Job.UUID).Logger()
|
logger = logger.With().Str("job", jobUUID).Logger()
|
||||||
|
|
||||||
size, err := f.logStorage.TaskLogSize(dbTask.Job.UUID, taskID)
|
size, err := f.logStorage.TaskLogSize(jobUUID, taskID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, os.ErrNotExist) {
|
if errors.Is(err, os.ErrNotExist) {
|
||||||
logger.Debug().Msg("task log unavailable, task has no log on disk")
|
logger.Debug().Msg("task log unavailable, task has no log on disk")
|
||||||
@ -475,11 +475,11 @@ func (f *Flamenco) FetchTaskLogInfo(e echo.Context, taskID string) error {
|
|||||||
|
|
||||||
taskLogInfo := api.TaskLogInfo{
|
taskLogInfo := api.TaskLogInfo{
|
||||||
TaskId: taskID,
|
TaskId: taskID,
|
||||||
JobId: dbTask.Job.UUID,
|
JobId: jobUUID,
|
||||||
Size: int(size),
|
Size: int(size),
|
||||||
}
|
}
|
||||||
|
|
||||||
fullLogPath := f.logStorage.Filepath(dbTask.Job.UUID, taskID)
|
fullLogPath := f.logStorage.Filepath(jobUUID, taskID)
|
||||||
relPath, err := f.localStorage.RelPath(fullLogPath)
|
relPath, err := f.localStorage.RelPath(fullLogPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error().Err(err).Msg("task log is outside the manager storage, cannot construct its URL for download")
|
logger.Error().Err(err).Msg("task log is outside the manager storage, cannot construct its URL for download")
|
||||||
|
@ -454,18 +454,45 @@ func (db *DB) SaveJobStorageInfo(ctx context.Context, j *Job) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) FetchTask(ctx context.Context, taskUUID string) (*Task, error) {
|
func (db *DB) FetchTask(ctx context.Context, taskUUID string) (*Task, error) {
|
||||||
dbTask := Task{}
|
queries, err := db.queries()
|
||||||
tx := db.gormDB.WithContext(ctx).
|
if err != nil {
|
||||||
// Allow finding the Worker, even after it was deleted. Jobs and Tasks
|
return nil, err
|
||||||
// don't have soft-deletion.
|
|
||||||
Unscoped().
|
|
||||||
Joins("Job").
|
|
||||||
Joins("Worker").
|
|
||||||
First(&dbTask, "tasks.uuid = ?", taskUUID)
|
|
||||||
if tx.Error != nil {
|
|
||||||
return nil, taskError(tx.Error, "fetching task")
|
|
||||||
}
|
}
|
||||||
return &dbTask, nil
|
|
||||||
|
task, err := queries.FetchTask(ctx, taskUUID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, taskError(err, "fetching task %s", taskUUID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return convertSqlcTask(task)
|
||||||
|
}
|
||||||
|
|
||||||
|
// FetchTaskWithJobUUID fetches a Task, and includes the job's UUID.
|
||||||
|
// No other job information is fetched.
|
||||||
|
func (db *DB) FetchTaskWithJobUUID(
|
||||||
|
ctx context.Context,
|
||||||
|
taskUUID string,
|
||||||
|
) (task *Task, jobUUID string, err error) {
|
||||||
|
queries, err := db.queries()
|
||||||
|
if err != nil {
|
||||||
|
return nil, "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
fetchTaskRow, err := queries.FetchTaskWithJobUUID(ctx, taskUUID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, "", taskError(err, "fetching task %s", taskUUID)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !fetchTaskRow.JobUUID.Valid {
|
||||||
|
return nil, "", taskError(err, "task %s has non-existing job id=%d", taskUUID, fetchTaskRow.Task.JobID)
|
||||||
|
}
|
||||||
|
|
||||||
|
task, err = convertSqlcTask(fetchTaskRow.Task)
|
||||||
|
if err != nil {
|
||||||
|
return nil, "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
return task, fetchTaskRow.JobUUID.String, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) SaveTask(ctx context.Context, t *Task) error {
|
func (db *DB) SaveTask(ctx context.Context, t *Task) error {
|
||||||
@ -791,3 +818,40 @@ func convertSqlcJob(job sqlc.Job) (*Job, error) {
|
|||||||
|
|
||||||
return &dbJob, nil
|
return &dbJob, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// convertSqlcTask converts a task from the SQLC-generated model to the model
|
||||||
|
// expected by the rest of the code. This is mostly in place to aid in the GORM
|
||||||
|
// to SQLC migration. It is intended that eventually the rest of the code will
|
||||||
|
// use the same SQLC-generated model.
|
||||||
|
func convertSqlcTask(task sqlc.Task) (*Task, error) {
|
||||||
|
dbTask := Task{
|
||||||
|
Model: Model{
|
||||||
|
ID: uint(task.ID),
|
||||||
|
CreatedAt: task.CreatedAt,
|
||||||
|
UpdatedAt: task.UpdatedAt.Time,
|
||||||
|
},
|
||||||
|
|
||||||
|
UUID: task.UUID,
|
||||||
|
Name: task.Name,
|
||||||
|
Type: task.Type,
|
||||||
|
Priority: int(task.Priority),
|
||||||
|
Status: api.TaskStatus(task.Status),
|
||||||
|
LastTouchedAt: task.LastTouchedAt.Time,
|
||||||
|
Activity: task.Activity,
|
||||||
|
|
||||||
|
JobID: uint(task.JobID),
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: convert dependencies?
|
||||||
|
|
||||||
|
if task.WorkerID.Valid {
|
||||||
|
workerID := uint(task.WorkerID.Int64)
|
||||||
|
dbTask.WorkerID = &workerID
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := json.Unmarshal(task.Commands, &dbTask.Commands); err != nil {
|
||||||
|
return nil, taskError(err, fmt.Sprintf("task %s of job %s has invalid commands: %v", task.UUID, job.UUID, err))
|
||||||
|
}
|
||||||
|
|
||||||
|
return &dbTask, nil
|
||||||
|
}
|
||||||
|
@ -238,9 +238,12 @@ func TestCountTaskFailuresOfWorker(t *testing.T) {
|
|||||||
ctx, close, db, dbJob, authoredJob := jobTasksTestFixtures(t)
|
ctx, close, db, dbJob, authoredJob := jobTasksTestFixtures(t)
|
||||||
defer close()
|
defer close()
|
||||||
|
|
||||||
task0, _ := db.FetchTask(ctx, authoredJob.Tasks[0].UUID)
|
task0, err := db.FetchTask(ctx, authoredJob.Tasks[0].UUID)
|
||||||
task1, _ := db.FetchTask(ctx, authoredJob.Tasks[1].UUID)
|
require.NoError(t, err)
|
||||||
task2, _ := db.FetchTask(ctx, authoredJob.Tasks[2].UUID)
|
task1, err := db.FetchTask(ctx, authoredJob.Tasks[1].UUID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
task2, err := db.FetchTask(ctx, authoredJob.Tasks[2].UUID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Sanity check on the test data.
|
// Sanity check on the test data.
|
||||||
assert.Equal(t, "blender", task0.Type)
|
assert.Equal(t, "blender", task0.Type)
|
||||||
|
@ -55,3 +55,12 @@ UPDATE jobs SET updated_at=@now, priority=@priority WHERE id=@id;
|
|||||||
|
|
||||||
-- name: SaveJobStorageInfo :exec
|
-- name: SaveJobStorageInfo :exec
|
||||||
UPDATE jobs SET storage_shaman_checkout_id=@storage_shaman_checkout_id WHERE id=@id;
|
UPDATE jobs SET storage_shaman_checkout_id=@storage_shaman_checkout_id WHERE id=@id;
|
||||||
|
|
||||||
|
-- name: FetchTask :one
|
||||||
|
SELECT * FROM tasks WHERE tasks.uuid = @uuid;
|
||||||
|
|
||||||
|
-- name: FetchTaskWithJobUUID :one
|
||||||
|
SELECT sqlc.embed(tasks), jobs.UUID as jobUUID
|
||||||
|
FROM tasks
|
||||||
|
LEFT JOIN jobs ON (tasks.job_id = jobs.id)
|
||||||
|
WHERE tasks.uuid = @uuid;
|
||||||
|
@ -204,6 +204,65 @@ func (q *Queries) FetchJobsInStatus(ctx context.Context, statuses []string) ([]J
|
|||||||
return items, nil
|
return items, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const fetchTask = `-- name: FetchTask :one
|
||||||
|
SELECT id, created_at, updated_at, uuid, name, type, job_id, priority, status, worker_id, last_touched_at, commands, activity FROM tasks WHERE tasks.uuid = ?1
|
||||||
|
`
|
||||||
|
|
||||||
|
func (q *Queries) FetchTask(ctx context.Context, uuid string) (Task, error) {
|
||||||
|
row := q.db.QueryRowContext(ctx, fetchTask, uuid)
|
||||||
|
var i Task
|
||||||
|
err := row.Scan(
|
||||||
|
&i.ID,
|
||||||
|
&i.CreatedAt,
|
||||||
|
&i.UpdatedAt,
|
||||||
|
&i.UUID,
|
||||||
|
&i.Name,
|
||||||
|
&i.Type,
|
||||||
|
&i.JobID,
|
||||||
|
&i.Priority,
|
||||||
|
&i.Status,
|
||||||
|
&i.WorkerID,
|
||||||
|
&i.LastTouchedAt,
|
||||||
|
&i.Commands,
|
||||||
|
&i.Activity,
|
||||||
|
)
|
||||||
|
return i, err
|
||||||
|
}
|
||||||
|
|
||||||
|
const fetchTaskWithJobUUID = `-- name: FetchTaskWithJobUUID :one
|
||||||
|
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity, jobs.UUID as jobUUID
|
||||||
|
FROM tasks
|
||||||
|
LEFT JOIN jobs ON (tasks.job_id = jobs.id)
|
||||||
|
WHERE tasks.uuid = ?1
|
||||||
|
`
|
||||||
|
|
||||||
|
type FetchTaskWithJobUUIDRow struct {
|
||||||
|
Task Task
|
||||||
|
JobUUID sql.NullString
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *Queries) FetchTaskWithJobUUID(ctx context.Context, uuid string) (FetchTaskWithJobUUIDRow, error) {
|
||||||
|
row := q.db.QueryRowContext(ctx, fetchTaskWithJobUUID, uuid)
|
||||||
|
var i FetchTaskWithJobUUIDRow
|
||||||
|
err := row.Scan(
|
||||||
|
&i.Task.ID,
|
||||||
|
&i.Task.CreatedAt,
|
||||||
|
&i.Task.UpdatedAt,
|
||||||
|
&i.Task.UUID,
|
||||||
|
&i.Task.Name,
|
||||||
|
&i.Task.Type,
|
||||||
|
&i.Task.JobID,
|
||||||
|
&i.Task.Priority,
|
||||||
|
&i.Task.Status,
|
||||||
|
&i.Task.WorkerID,
|
||||||
|
&i.Task.LastTouchedAt,
|
||||||
|
&i.Task.Commands,
|
||||||
|
&i.Task.Activity,
|
||||||
|
&i.JobUUID,
|
||||||
|
)
|
||||||
|
return i, err
|
||||||
|
}
|
||||||
|
|
||||||
const requestJobDeletion = `-- name: RequestJobDeletion :exec
|
const requestJobDeletion = `-- name: RequestJobDeletion :exec
|
||||||
UPDATE jobs SET
|
UPDATE jobs SET
|
||||||
updated_at = ?1,
|
updated_at = ?1,
|
||||||
|
@ -10,6 +10,7 @@ import (
|
|||||||
|
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
|
"projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc"
|
||||||
"projects.blender.org/studio/flamenco/pkg/api"
|
"projects.blender.org/studio/flamenco/pkg/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -207,3 +208,34 @@ func (db *DB) SummarizeWorkerStatuses(ctx context.Context) (WorkerStatusCount, e
|
|||||||
|
|
||||||
return statusCounts, nil
|
return statusCounts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// convertSqlcWorker converts a worker from the SQLC-generated model to the model
|
||||||
|
// expected by the rest of the code. This is mostly in place to aid in the GORM
|
||||||
|
// to SQLC migration. It is intended that eventually the rest of the code will
|
||||||
|
// use the same SQLC-generated model.
|
||||||
|
func convertSqlcWorker(worker sqlc.Worker) (*Worker, error) {
|
||||||
|
dbWorker := Worker{
|
||||||
|
Model: Model{
|
||||||
|
ID: uint(worker.ID),
|
||||||
|
CreatedAt: worker.CreatedAt,
|
||||||
|
UpdatedAt: worker.UpdatedAt.Time,
|
||||||
|
},
|
||||||
|
DeletedAt: gorm.DeletedAt(worker.DeletedAt),
|
||||||
|
|
||||||
|
UUID: worker.UUID,
|
||||||
|
Secret: worker.Secret,
|
||||||
|
Name: worker.Name,
|
||||||
|
Address: worker.Address,
|
||||||
|
Platform: worker.Platform,
|
||||||
|
Software: worker.Software,
|
||||||
|
Status: api.WorkerStatus(worker.Status),
|
||||||
|
LastSeenAt: worker.LastSeenAt.Time,
|
||||||
|
CanRestart: worker.CanRestart != 0,
|
||||||
|
StatusRequested: api.WorkerStatus(worker.StatusRequested),
|
||||||
|
LazyStatusRequest: worker.LazyStatusRequest != 0,
|
||||||
|
SupportedTaskTypes: worker.SupportedTaskTypes,
|
||||||
|
// TODO: Tags []*WorkerTag `gorm:"many2many:worker_tag_membership;constraint:OnDelete:CASCADE"`
|
||||||
|
}
|
||||||
|
|
||||||
|
return &dbWorker, nil
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user