WIP: convert GORM to sqlc, for jobs/tasks #104304

Closed
Sybren A. Stüvel wants to merge 27 commits from sqlc-task into main

When changing the target branch, be careful to rebase the branch in your fork to match. See documentation.
7 changed files with 90 additions and 82 deletions
Showing only changes of commit 567adfca32 - Show all commits

View File

@ -36,7 +36,8 @@ type PersistenceService interface {
SaveJobPriority(ctx context.Context, job *persistence.Job) error SaveJobPriority(ctx context.Context, job *persistence.Job) error
// FetchTask fetches the given task and the accompanying job. // FetchTask fetches the given task and the accompanying job.
FetchTask(ctx context.Context, taskID string) (*persistence.Task, error) FetchTask(ctx context.Context, taskID string) (*persistence.Task, error)
FetchTaskWithJobUUID(ctx context.Context, taskID string) (task *persistence.Task, jobUUID string, err error) // FetchTaskJobUUID fetches the UUID of the job this task belongs to.
FetchTaskJobUUID(ctx context.Context, taskID string) (string, error)
FetchTaskFailureList(context.Context, *persistence.Task) ([]*persistence.Worker, error) FetchTaskFailureList(context.Context, *persistence.Task) ([]*persistence.Worker, error)
SaveTask(ctx context.Context, task *persistence.Task) error SaveTask(ctx context.Context, task *persistence.Task) error
SaveTaskActivity(ctx context.Context, t *persistence.Task) error SaveTaskActivity(ctx context.Context, t *persistence.Task) error

View File

@ -439,7 +439,7 @@ func (f *Flamenco) FetchTaskLogInfo(e echo.Context, taskID string) error {
return sendAPIError(e, http.StatusBadRequest, "bad task ID") return sendAPIError(e, http.StatusBadRequest, "bad task ID")
} }
dbTask, jobUUID, err := f.persist.FetchTaskWithJobUUID(ctx, taskID) jobUUID, err := f.persist.FetchTaskJobUUID(ctx, taskID)
if err != nil { if err != nil {
if errors.Is(err, persistence.ErrTaskNotFound) { if errors.Is(err, persistence.ErrTaskNotFound) {
return sendAPIError(e, http.StatusNotFound, "no such task") return sendAPIError(e, http.StatusNotFound, "no such task")
@ -501,7 +501,7 @@ func (f *Flamenco) FetchTaskLogTail(e echo.Context, taskID string) error {
return sendAPIError(e, http.StatusBadRequest, "bad task ID") return sendAPIError(e, http.StatusBadRequest, "bad task ID")
} }
dbTask, err := f.persist.FetchTask(ctx, taskID) jobUUID, err := f.persist.FetchTaskJobUUID(ctx, taskID)
if err != nil { if err != nil {
if errors.Is(err, persistence.ErrTaskNotFound) { if errors.Is(err, persistence.ErrTaskNotFound) {
return sendAPIError(e, http.StatusNotFound, "no such task") return sendAPIError(e, http.StatusNotFound, "no such task")
@ -509,9 +509,9 @@ func (f *Flamenco) FetchTaskLogTail(e echo.Context, taskID string) error {
logger.Error().Err(err).Msg("error fetching task") logger.Error().Err(err).Msg("error fetching task")
return sendAPIError(e, http.StatusInternalServerError, "error fetching task: %v", err) return sendAPIError(e, http.StatusInternalServerError, "error fetching task: %v", err)
} }
logger = logger.With().Str("job", dbTask.Job.UUID).Logger() logger = logger.With().Str("job", jobUUID).Logger()
tail, err := f.logStorage.Tail(dbTask.Job.UUID, taskID) tail, err := f.logStorage.Tail(jobUUID, taskID)
if err != nil { if err != nil {
if errors.Is(err, os.ErrNotExist) { if errors.Is(err, os.ErrNotExist) {
logger.Debug().Msg("task tail unavailable, task has no log on disk") logger.Debug().Msg("task tail unavailable, task has no log on disk")
@ -700,7 +700,11 @@ func taskDBtoAPI(dbTask *persistence.Task) api.Task {
Status: dbTask.Status, Status: dbTask.Status,
Activity: dbTask.Activity, Activity: dbTask.Activity,
Commands: make([]api.Command, len(dbTask.Commands)), Commands: make([]api.Command, len(dbTask.Commands)),
// TODO: convert this to just store dbTask.WorkerUUID.
Worker: workerToTaskWorker(dbTask.Worker), Worker: workerToTaskWorker(dbTask.Worker),
JobId: dbTask.JobUUID,
} }
if dbTask.Job != nil { if dbTask.Job != nil {

View File

@ -66,12 +66,14 @@ type Task struct {
Type string `gorm:"type:varchar(32);default:''"` Type string `gorm:"type:varchar(32);default:''"`
JobID uint `gorm:"default:0"` JobID uint `gorm:"default:0"`
Job *Job `gorm:"foreignkey:JobID;references:ID;constraint:OnDelete:CASCADE"` Job *Job `gorm:"foreignkey:JobID;references:ID;constraint:OnDelete:CASCADE"`
JobUUID string `gorm:"-"` // Fetched by SQLC, not GORM.
Priority int `gorm:"type:smallint;default:50"` Priority int `gorm:"type:smallint;default:50"`
Status api.TaskStatus `gorm:"type:varchar(16);default:''"` Status api.TaskStatus `gorm:"type:varchar(16);default:''"`
// Which worker is/was working on this. // Which worker is/was working on this.
WorkerID *uint WorkerID *uint
Worker *Worker `gorm:"foreignkey:WorkerID;references:ID;constraint:OnDelete:SET NULL"` Worker *Worker `gorm:"foreignkey:WorkerID;references:ID;constraint:OnDelete:SET NULL"`
WorkerUUID string `gorm:"-"` // Fetched by SQLC, not GORM.
LastTouchedAt time.Time `gorm:"index"` // Should contain UTC timestamps. LastTouchedAt time.Time `gorm:"index"` // Should contain UTC timestamps.
// Dependencies are tasks that need to be completed before this one can run. // Dependencies are tasks that need to be completed before this one can run.
@ -459,40 +461,28 @@ func (db *DB) FetchTask(ctx context.Context, taskUUID string) (*Task, error) {
return nil, err return nil, err
} }
task, err := queries.FetchTask(ctx, taskUUID) taskRow, err := queries.FetchTask(ctx, taskUUID)
if err != nil { if err != nil {
return nil, taskError(err, "fetching task %s", taskUUID) return nil, taskError(err, "fetching task %s", taskUUID)
} }
return convertSqlcTask(task) return convertSqlcTask(taskRow)
} }
// FetchTaskWithJobUUID fetches a Task, and includes the job's UUID. func (db *DB) FetchTaskJobUUID(ctx context.Context, taskUUID string) (string, error) {
// No other job information is fetched.
func (db *DB) FetchTaskWithJobUUID(
ctx context.Context,
taskUUID string,
) (task *Task, jobUUID string, err error) {
queries, err := db.queries() queries, err := db.queries()
if err != nil { if err != nil {
return nil, "", err return "", err
} }
fetchTaskRow, err := queries.FetchTaskWithJobUUID(ctx, taskUUID) jobUUID, err := queries.FetchTaskJobUUID(ctx, taskUUID)
if err != nil { if err != nil {
return nil, "", taskError(err, "fetching task %s", taskUUID) return "", taskError(err, "fetching job UUID of task %s", taskUUID)
} }
if !jobUUID.Valid {
if !fetchTaskRow.JobUUID.Valid { return "", PersistenceError{Message: fmt.Sprintf("unable to find job of task %s", taskUUID)}
return nil, "", taskError(err, "task %s has non-existing job id=%d", taskUUID, fetchTaskRow.Task.JobID)
} }
return jobUUID.String, nil
task, err = convertSqlcTask(fetchTaskRow.Task)
if err != nil {
return nil, "", err
}
return task, fetchTaskRow.JobUUID.String, nil
} }
func (db *DB) SaveTask(ctx context.Context, t *Task) error { func (db *DB) SaveTask(ctx context.Context, t *Task) error {
@ -819,38 +809,41 @@ func convertSqlcJob(job sqlc.Job) (*Job, error) {
return &dbJob, nil return &dbJob, nil
} }
// convertSqlcTask converts a task from the SQLC-generated model to the model // convertSqlcTask converts a FetchTaskRow from the SQLC-generated model to the
// expected by the rest of the code. This is mostly in place to aid in the GORM // model expected by the rest of the code. This is mostly in place to aid in the
// to SQLC migration. It is intended that eventually the rest of the code will // GORM to SQLC migration. It is intended that eventually the rest of the code
// use the same SQLC-generated model. // will use the same SQLC-generated model.
func convertSqlcTask(task sqlc.Task) (*Task, error) { func convertSqlcTask(taskRow sqlc.FetchTaskRow) (*Task, error) {
dbTask := Task{ dbTask := Task{
Model: Model{ Model: Model{
ID: uint(task.ID), ID: uint(taskRow.Task.ID),
CreatedAt: task.CreatedAt, CreatedAt: taskRow.Task.CreatedAt,
UpdatedAt: task.UpdatedAt.Time, UpdatedAt: taskRow.Task.UpdatedAt.Time,
}, },
UUID: task.UUID, UUID: taskRow.Task.UUID,
Name: task.Name, Name: taskRow.Task.Name,
Type: task.Type, Type: taskRow.Task.Type,
Priority: int(task.Priority), Priority: int(taskRow.Task.Priority),
Status: api.TaskStatus(task.Status), Status: api.TaskStatus(taskRow.Task.Status),
LastTouchedAt: task.LastTouchedAt.Time, LastTouchedAt: taskRow.Task.LastTouchedAt.Time,
Activity: task.Activity, Activity: taskRow.Task.Activity,
JobID: uint(task.JobID), JobID: uint(taskRow.Task.JobID),
JobUUID: taskRow.JobUUID.String,
WorkerUUID: taskRow.WorkerUUID.String,
} }
// TODO: convert dependencies? // TODO: convert dependencies?
if task.WorkerID.Valid { if taskRow.Task.WorkerID.Valid {
workerID := uint(task.WorkerID.Int64) workerID := uint(taskRow.Task.WorkerID.Int64)
dbTask.WorkerID = &workerID dbTask.WorkerID = &workerID
} }
if err := json.Unmarshal(task.Commands, &dbTask.Commands); err != nil { if err := json.Unmarshal(taskRow.Task.Commands, &dbTask.Commands); err != nil {
return nil, taskError(err, fmt.Sprintf("task %s of job %s has invalid commands: %v", task.UUID, job.UUID, err)) return nil, taskError(err, fmt.Sprintf("task %s of job %s has invalid commands: %v",
taskRow.Task.UUID, taskRow.JobUUID.String, err))
} }
return &dbTask, nil return &dbTask, nil

View File

@ -75,6 +75,19 @@ func TestStoreAuthoredJobWithShamanCheckoutID(t *testing.T) {
assert.Equal(t, job.Storage.ShamanCheckoutID, fetchedJob.Storage.ShamanCheckoutID) assert.Equal(t, job.Storage.ShamanCheckoutID, fetchedJob.Storage.ShamanCheckoutID)
} }
func TestFetchTaskJobUUID(t *testing.T) {
ctx, cancel, db := persistenceTestFixtures(t, 1*time.Second)
defer cancel()
job := createTestAuthoredJobWithTasks()
err := db.StoreAuthoredJob(ctx, job)
require.NoError(t, err)
jobUUID, err := db.FetchTaskJobUUID(ctx, job.Tasks[0].UUID)
require.NoError(t, err)
assert.Equal(t, job.JobID, jobUUID)
}
func TestSaveJobStorageInfo(t *testing.T) { func TestSaveJobStorageInfo(t *testing.T) {
// Test that saving job storage info doesn't count as "update". // Test that saving job storage info doesn't count as "update".
// This is necessary for `cmd/shaman-checkout-id-setter` to do its work quietly. // This is necessary for `cmd/shaman-checkout-id-setter` to do its work quietly.

View File

@ -57,10 +57,14 @@ UPDATE jobs SET updated_at=@now, priority=@priority WHERE id=@id;
UPDATE jobs SET storage_shaman_checkout_id=@storage_shaman_checkout_id WHERE id=@id; UPDATE jobs SET storage_shaman_checkout_id=@storage_shaman_checkout_id WHERE id=@id;
-- name: FetchTask :one -- name: FetchTask :one
SELECT * FROM tasks WHERE tasks.uuid = @uuid; SELECT sqlc.embed(tasks), jobs.UUID as jobUUID, workers.UUID as workerUUID
FROM tasks
LEFT JOIN jobs ON (tasks.job_id = jobs.id)
LEFT JOIN workers ON (tasks.worker_id = workers.id)
WHERE tasks.uuid = @uuid;
-- name: FetchTaskWithJobUUID :one -- name: FetchTaskJobUUID :one
SELECT sqlc.embed(tasks), jobs.UUID as jobUUID SELECT jobs.UUID as jobUUID
FROM tasks FROM tasks
LEFT JOIN jobs ON (tasks.job_id = jobs.id) LEFT JOIN jobs ON (tasks.job_id = jobs.id)
WHERE tasks.uuid = @uuid; WHERE tasks.uuid = @uuid;

View File

@ -205,45 +205,22 @@ func (q *Queries) FetchJobsInStatus(ctx context.Context, statuses []string) ([]J
} }
const fetchTask = `-- name: FetchTask :one const fetchTask = `-- name: FetchTask :one
SELECT id, created_at, updated_at, uuid, name, type, job_id, priority, status, worker_id, last_touched_at, commands, activity FROM tasks WHERE tasks.uuid = ?1 SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity, jobs.UUID as jobUUID, workers.UUID as workerUUID
`
func (q *Queries) FetchTask(ctx context.Context, uuid string) (Task, error) {
row := q.db.QueryRowContext(ctx, fetchTask, uuid)
var i Task
err := row.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.UUID,
&i.Name,
&i.Type,
&i.JobID,
&i.Priority,
&i.Status,
&i.WorkerID,
&i.LastTouchedAt,
&i.Commands,
&i.Activity,
)
return i, err
}
const fetchTaskWithJobUUID = `-- name: FetchTaskWithJobUUID :one
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity, jobs.UUID as jobUUID
FROM tasks FROM tasks
LEFT JOIN jobs ON (tasks.job_id = jobs.id) LEFT JOIN jobs ON (tasks.job_id = jobs.id)
LEFT JOIN workers ON (tasks.worker_id = workers.id)
WHERE tasks.uuid = ?1 WHERE tasks.uuid = ?1
` `
type FetchTaskWithJobUUIDRow struct { type FetchTaskRow struct {
Task Task Task Task
JobUUID sql.NullString JobUUID sql.NullString
WorkerUUID sql.NullString
} }
func (q *Queries) FetchTaskWithJobUUID(ctx context.Context, uuid string) (FetchTaskWithJobUUIDRow, error) { func (q *Queries) FetchTask(ctx context.Context, uuid string) (FetchTaskRow, error) {
row := q.db.QueryRowContext(ctx, fetchTaskWithJobUUID, uuid) row := q.db.QueryRowContext(ctx, fetchTask, uuid)
var i FetchTaskWithJobUUIDRow var i FetchTaskRow
err := row.Scan( err := row.Scan(
&i.Task.ID, &i.Task.ID,
&i.Task.CreatedAt, &i.Task.CreatedAt,
@ -259,10 +236,25 @@ func (q *Queries) FetchTaskWithJobUUID(ctx context.Context, uuid string) (FetchT
&i.Task.Commands, &i.Task.Commands,
&i.Task.Activity, &i.Task.Activity,
&i.JobUUID, &i.JobUUID,
&i.WorkerUUID,
) )
return i, err return i, err
} }
const fetchTaskJobUUID = `-- name: FetchTaskJobUUID :one
SELECT jobs.UUID as jobUUID
FROM tasks
LEFT JOIN jobs ON (tasks.job_id = jobs.id)
WHERE tasks.uuid = ?1
`
func (q *Queries) FetchTaskJobUUID(ctx context.Context, uuid string) (sql.NullString, error) {
row := q.db.QueryRowContext(ctx, fetchTaskJobUUID, uuid)
var jobuuid sql.NullString
err := row.Scan(&jobuuid)
return jobuuid, err
}
const requestJobDeletion = `-- name: RequestJobDeletion :exec const requestJobDeletion = `-- name: RequestJobDeletion :exec
UPDATE jobs SET UPDATE jobs SET
updated_at = ?1, updated_at = ?1,

View File

@ -15,3 +15,4 @@ sql:
uuid: "UUID" uuid: "UUID"
uuids: "UUIDs" uuids: "UUIDs"
jobuuid: "JobUUID" jobuuid: "JobUUID"
workeruuid: "WorkerUUID"