WIP: convert GORM to sqlc, for jobs/tasks #104304

Closed
Sybren A. Stüvel wants to merge 27 commits from sqlc-task into main

When changing the target branch, be careful to rebase the branch in your fork to match. See documentation.
3 changed files with 151 additions and 36 deletions
Showing only changes of commit febc88fd9f - Show all commits

View File

@ -466,7 +466,7 @@ func (db *DB) FetchTask(ctx context.Context, taskUUID string) (*Task, error) {
return nil, taskError(err, "fetching task %s", taskUUID) return nil, taskError(err, "fetching task %s", taskUUID)
} }
convertedTask, err := convertSqlcTask(taskRow) convertedTask, err := convertSqlcTask(taskRow.Task, taskRow.JobUUID.String, taskRow.WorkerUUID.String)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -605,32 +605,60 @@ func (db *DB) SaveTaskActivity(ctx context.Context, t *Task) error {
return nil return nil
} }
// TaskAssignToWorker assigns the given task to the given worker.
// This function is only used by unit tests. During normal operation, Flamenco
// uses the code in task_scheduler.go to assign tasks to workers.
func (db *DB) TaskAssignToWorker(ctx context.Context, t *Task, w *Worker) error { func (db *DB) TaskAssignToWorker(ctx context.Context, t *Task, w *Worker) error {
tx := db.gormDB.WithContext(ctx). queries, err := db.queries()
Model(t). if err != nil {
Select("WorkerID"). return err
Updates(Task{WorkerID: &w.ID})
if tx.Error != nil {
return taskError(tx.Error, "assigning task %s to worker %s", t.UUID, w.UUID)
} }
// Gorm updates t.WorkerID itself, but not t.Worker (even when it's added to err = queries.TaskAssignToWorker(ctx, sqlc.TaskAssignToWorkerParams{
// the Updates() call above). UpdatedAt: db.now(),
WorkerID: sql.NullInt64{
Int64: int64(w.ID),
Valid: true,
},
ID: int64(t.ID),
})
if err != nil {
return taskError(err, "assigning task %s to worker %s", t.UUID, w.UUID)
}
// Update the task itself.
t.Worker = w t.Worker = w
t.WorkerID = &w.ID
return nil return nil
} }
func (db *DB) FetchTasksOfWorkerInStatus(ctx context.Context, worker *Worker, taskStatus api.TaskStatus) ([]*Task, error) { func (db *DB) FetchTasksOfWorkerInStatus(ctx context.Context, worker *Worker, taskStatus api.TaskStatus) ([]*Task, error) {
result := []*Task{} queries, err := db.queries()
tx := db.gormDB.WithContext(ctx). if err != nil {
Model(&Task{}). return nil, err
Joins("Job"). }
Where("tasks.worker_id = ?", worker.ID).
Where("tasks.status = ?", taskStatus). rows, err := queries.FetchTasksOfWorkerInStatus(ctx, sqlc.FetchTasksOfWorkerInStatusParams{
Scan(&result) WorkerID: sql.NullInt64{
if tx.Error != nil { Int64: int64(worker.ID),
return nil, taskError(tx.Error, "finding tasks of worker %s in status %q", worker.UUID, taskStatus) Valid: true,
},
TaskStatus: string(taskStatus),
})
if err != nil {
return nil, taskError(err, "finding tasks of worker %s in status %q", worker.UUID, taskStatus)
}
result := make([]*Task, len(rows))
for i := range rows {
gormTask, err := convertSqlcTask(rows[i].Task, rows[i].JobUUID.String, worker.UUID)
if err != nil {
return nil, err
}
gormTask.Worker = worker
gormTask.WorkerID = &worker.ID
result[i] = gormTask
} }
return result, nil return result, nil
} }
@ -902,37 +930,37 @@ func convertSqlcJob(job sqlc.Job) (*Job, error) {
// model expected by the rest of the code. This is mostly in place to aid in the // model expected by the rest of the code. This is mostly in place to aid in the
// GORM to SQLC migration. It is intended that eventually the rest of the code // GORM to SQLC migration. It is intended that eventually the rest of the code
// will use the same SQLC-generated model. // will use the same SQLC-generated model.
func convertSqlcTask(taskRow sqlc.FetchTaskRow) (*Task, error) { func convertSqlcTask(task sqlc.Task, jobUUID string, workerUUID string) (*Task, error) {
dbTask := Task{ dbTask := Task{
Model: Model{ Model: Model{
ID: uint(taskRow.Task.ID), ID: uint(task.ID),
CreatedAt: taskRow.Task.CreatedAt, CreatedAt: task.CreatedAt,
UpdatedAt: taskRow.Task.UpdatedAt.Time, UpdatedAt: task.UpdatedAt.Time,
}, },
UUID: taskRow.Task.UUID, UUID: task.UUID,
Name: taskRow.Task.Name, Name: task.Name,
Type: taskRow.Task.Type, Type: task.Type,
Priority: int(taskRow.Task.Priority), Priority: int(task.Priority),
Status: api.TaskStatus(taskRow.Task.Status), Status: api.TaskStatus(task.Status),
LastTouchedAt: taskRow.Task.LastTouchedAt.Time, LastTouchedAt: task.LastTouchedAt.Time,
Activity: taskRow.Task.Activity, Activity: task.Activity,
JobID: uint(taskRow.Task.JobID), JobID: uint(task.JobID),
JobUUID: taskRow.JobUUID.String, JobUUID: jobUUID,
WorkerUUID: taskRow.WorkerUUID.String, WorkerUUID: workerUUID,
} }
// TODO: convert dependencies? // TODO: convert dependencies?
if taskRow.Task.WorkerID.Valid { if task.WorkerID.Valid {
workerID := uint(taskRow.Task.WorkerID.Int64) workerID := uint(task.WorkerID.Int64)
dbTask.WorkerID = &workerID dbTask.WorkerID = &workerID
} }
if err := json.Unmarshal(taskRow.Task.Commands, &dbTask.Commands); err != nil { if err := json.Unmarshal(task.Commands, &dbTask.Commands); err != nil {
return nil, taskError(err, fmt.Sprintf("task %s of job %s has invalid commands: %v", return nil, taskError(err, fmt.Sprintf("task %s of job %s has invalid commands: %v",
taskRow.Task.UUID, taskRow.JobUUID.String, err)) task.UUID, jobUUID, err))
} }
return &dbTask, nil return &dbTask, nil

View File

@ -69,6 +69,13 @@ LEFT JOIN jobs ON (tasks.job_id = jobs.id)
LEFT JOIN workers ON (tasks.worker_id = workers.id) LEFT JOIN workers ON (tasks.worker_id = workers.id)
WHERE tasks.uuid = @uuid; WHERE tasks.uuid = @uuid;
-- name: FetchTasksOfWorkerInStatus :many
SELECT sqlc.embed(tasks), jobs.UUID as jobUUID
FROM tasks
LEFT JOIN jobs ON (tasks.job_id = jobs.id)
WHERE tasks.worker_id = @worker_id
AND tasks.status = @task_status;
-- name: FetchTaskJobUUID :one -- name: FetchTaskJobUUID :one
SELECT jobs.UUID as jobUUID SELECT jobs.UUID as jobUUID
FROM tasks FROM tasks
@ -100,3 +107,9 @@ UPDATE tasks SET
updated_at = @updated_at, updated_at = @updated_at,
activity = @activity activity = @activity
WHERE id=@id; WHERE id=@id;
-- name: TaskAssignToWorker :exec
UPDATE tasks SET
updated_at = @updated_at,
worker_id = @worker_id
WHERE id=@id;

View File

@ -285,6 +285,62 @@ func (q *Queries) FetchTaskJobUUID(ctx context.Context, uuid string) (sql.NullSt
return jobuuid, err return jobuuid, err
} }
const fetchTasksOfWorkerInStatus = `-- name: FetchTasksOfWorkerInStatus :many
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity, jobs.UUID as jobUUID
FROM tasks
LEFT JOIN jobs ON (tasks.job_id = jobs.id)
WHERE tasks.worker_id = ?1
AND tasks.status = ?2
`
type FetchTasksOfWorkerInStatusParams struct {
WorkerID sql.NullInt64
TaskStatus string
}
type FetchTasksOfWorkerInStatusRow struct {
Task Task
JobUUID sql.NullString
}
func (q *Queries) FetchTasksOfWorkerInStatus(ctx context.Context, arg FetchTasksOfWorkerInStatusParams) ([]FetchTasksOfWorkerInStatusRow, error) {
rows, err := q.db.QueryContext(ctx, fetchTasksOfWorkerInStatus, arg.WorkerID, arg.TaskStatus)
if err != nil {
return nil, err
}
defer rows.Close()
var items []FetchTasksOfWorkerInStatusRow
for rows.Next() {
var i FetchTasksOfWorkerInStatusRow
if err := rows.Scan(
&i.Task.ID,
&i.Task.CreatedAt,
&i.Task.UpdatedAt,
&i.Task.UUID,
&i.Task.Name,
&i.Task.Type,
&i.Task.JobID,
&i.Task.Priority,
&i.Task.Status,
&i.Task.WorkerID,
&i.Task.LastTouchedAt,
&i.Task.Commands,
&i.Task.Activity,
&i.JobUUID,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const requestJobDeletion = `-- name: RequestJobDeletion :exec const requestJobDeletion = `-- name: RequestJobDeletion :exec
UPDATE jobs SET UPDATE jobs SET
updated_at = ?1, updated_at = ?1,
@ -380,6 +436,24 @@ func (q *Queries) SaveJobStorageInfo(ctx context.Context, arg SaveJobStorageInfo
return err return err
} }
const taskAssignToWorker = `-- name: TaskAssignToWorker :exec
UPDATE tasks SET
updated_at = ?1,
worker_id = ?2
WHERE id=?3
`
type TaskAssignToWorkerParams struct {
UpdatedAt sql.NullTime
WorkerID sql.NullInt64
ID int64
}
func (q *Queries) TaskAssignToWorker(ctx context.Context, arg TaskAssignToWorkerParams) error {
_, err := q.db.ExecContext(ctx, taskAssignToWorker, arg.UpdatedAt, arg.WorkerID, arg.ID)
return err
}
const updateTask = `-- name: UpdateTask :exec const updateTask = `-- name: UpdateTask :exec
UPDATE tasks SET UPDATE tasks SET
updated_at = ?1, updated_at = ?1,