WIP: convert GORM to sqlc, for jobs/tasks #104304

Closed
Sybren A. Stüvel wants to merge 27 commits from sqlc-task into main

When changing the target branch, be careful to rebase the branch in your fork to match. See documentation.
3 changed files with 166 additions and 36 deletions
Showing only changes of commit 00e904d4fc - Show all commits

View File

@ -14,7 +14,6 @@ import (
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"gorm.io/gorm" "gorm.io/gorm"
"gorm.io/gorm/clause"
"projects.blender.org/studio/flamenco/internal/manager/job_compilers" "projects.blender.org/studio/flamenco/internal/manager/job_compilers"
"projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc"
@ -885,64 +884,72 @@ func (db *DB) TaskTouchedByWorker(ctx context.Context, t *Task) error {
// //
// Returns the new number of workers that failed this task. // Returns the new number of workers that failed this task.
func (db *DB) AddWorkerToTaskFailedList(ctx context.Context, t *Task, w *Worker) (numFailed int, err error) { func (db *DB) AddWorkerToTaskFailedList(ctx context.Context, t *Task, w *Worker) (numFailed int, err error) {
entry := TaskFailure{ queries, err := db.queries()
Task: t, if err != nil {
Worker: w, return 0, err
}
tx := db.gormDB.WithContext(ctx).
Clauses(clause.OnConflict{DoNothing: true}).
Create(&entry)
if tx.Error != nil {
return 0, tx.Error
} }
var numFailed64 int64 err = queries.AddWorkerToTaskFailedList(ctx, sqlc.AddWorkerToTaskFailedListParams{
tx = db.gormDB.WithContext(ctx).Model(&TaskFailure{}). CreatedAt: db.now().Time,
Where("task_id=?", t.ID). TaskID: int64(t.ID),
Count(&numFailed64) WorkerID: int64(w.ID),
})
if err != nil {
return 0, err
}
numFailed64, err := queries.CountWorkersFailingTask(ctx, int64(t.ID))
if err != nil {
return 0, err
}
// Integer literals are of type `int`, so that's just a bit nicer to work with // Integer literals are of type `int`, so that's just a bit nicer to work with
// than `int64`. // than `int64`.
if numFailed64 > math.MaxInt32 { if numFailed64 > math.MaxInt32 {
log.Warn().Int64("numFailed", numFailed64).Msg("number of failed workers is crazy high, something is wrong here") log.Warn().Int64("numFailed", numFailed64).Msg("number of failed workers is crazy high, something is wrong here")
return math.MaxInt32, tx.Error return math.MaxInt32, nil
} }
return int(numFailed64), tx.Error return int(numFailed64), nil
} }
// ClearFailureListOfTask clears the list of workers that failed this task. // ClearFailureListOfTask clears the list of workers that failed this task.
func (db *DB) ClearFailureListOfTask(ctx context.Context, t *Task) error { func (db *DB) ClearFailureListOfTask(ctx context.Context, t *Task) error {
tx := db.gormDB.WithContext(ctx). queries, err := db.queries()
Where("task_id = ?", t.ID). if err != nil {
Delete(&TaskFailure{}) return err
return tx.Error }
return queries.ClearFailureListOfTask(ctx, int64(t.ID))
} }
// ClearFailureListOfJob en-mass, for all tasks of this job, clears the list of // ClearFailureListOfJob en-mass, for all tasks of this job, clears the list of
// workers that failed those tasks. // workers that failed those tasks.
func (db *DB) ClearFailureListOfJob(ctx context.Context, j *Job) error { func (db *DB) ClearFailureListOfJob(ctx context.Context, j *Job) error {
queries, err := db.queries()
if err != nil {
return err
}
// SQLite doesn't support JOIN in DELETE queries, so use a sub-query instead. return queries.ClearFailureListOfJob(ctx, int64(j.ID))
jobTasksQuery := db.gormDB.Model(&Task{}).
Select("id").
Where("job_id = ?", j.ID)
tx := db.gormDB.WithContext(ctx).
Where("task_id in (?)", jobTasksQuery).
Delete(&TaskFailure{})
return tx.Error
} }
func (db *DB) FetchTaskFailureList(ctx context.Context, t *Task) ([]*Worker, error) { func (db *DB) FetchTaskFailureList(ctx context.Context, t *Task) ([]*Worker, error) {
var workers []*Worker queries, err := db.queries()
if err != nil {
return nil, err
}
tx := db.gormDB.WithContext(ctx). failureList, err := queries.FetchTaskFailureList(ctx, int64(t.ID))
Model(&Worker{}). if err != nil {
Joins("inner join task_failures TF on TF.worker_id = workers.id"). return nil, err
Where("TF.task_id = ?", t.ID). }
Scan(&workers)
return workers, tx.Error workers := make([]*Worker, len(failureList))
for idx := range failureList {
worker := convertSqlcWorker(failureList[idx].Worker)
workers[idx] = &worker
}
return workers, nil
} }
// convertSqlcJob converts a job from the SQLC-generated model to the model // convertSqlcJob converts a job from the SQLC-generated model to the model

View File

@ -164,3 +164,26 @@ WHERE job_id = @job_id AND status = @task_status;
SELECT status, count(*) as num_tasks FROM tasks SELECT status, count(*) as num_tasks FROM tasks
WHERE job_id = @job_id WHERE job_id = @job_id
GROUP BY status; GROUP BY status;
-- name: AddWorkerToTaskFailedList :exec
INSERT INTO task_failures (created_at, task_id, worker_id)
VALUES (@created_at, @task_id, @worker_id)
ON CONFLICT DO NOTHING;
-- name: CountWorkersFailingTask :one
-- Count how many workers have failed a given task.
SELECT count(*) as num_failed FROM task_failures
WHERE task_id=@task_id;
-- name: ClearFailureListOfTask :exec
DELETE FROM task_failures WHERE task_id=@task_id;
-- name: ClearFailureListOfJob :exec
-- SQLite doesn't support JOIN in DELETE queries, so use a sub-query instead.
DELETE FROM task_failures
WHERE task_id in (SELECT id FROM tasks WHERE job_id=@job_id);
-- name: FetchTaskFailureList :many
SELECT sqlc.embed(workers) FROM workers
INNER JOIN task_failures TF on TF.worker_id=workers.id
WHERE TF.task_id=@task_id;

View File

@ -13,6 +13,56 @@ import (
"time" "time"
) )
const addWorkerToTaskFailedList = `-- name: AddWorkerToTaskFailedList :exec
INSERT INTO task_failures (created_at, task_id, worker_id)
VALUES (?1, ?2, ?3)
ON CONFLICT DO NOTHING
`
type AddWorkerToTaskFailedListParams struct {
CreatedAt time.Time
TaskID int64
WorkerID int64
}
func (q *Queries) AddWorkerToTaskFailedList(ctx context.Context, arg AddWorkerToTaskFailedListParams) error {
_, err := q.db.ExecContext(ctx, addWorkerToTaskFailedList, arg.CreatedAt, arg.TaskID, arg.WorkerID)
return err
}
const clearFailureListOfJob = `-- name: ClearFailureListOfJob :exec
DELETE FROM task_failures
WHERE task_id in (SELECT id FROM tasks WHERE job_id=?1)
`
// SQLite doesn't support JOIN in DELETE queries, so use a sub-query instead.
func (q *Queries) ClearFailureListOfJob(ctx context.Context, jobID int64) error {
_, err := q.db.ExecContext(ctx, clearFailureListOfJob, jobID)
return err
}
const clearFailureListOfTask = `-- name: ClearFailureListOfTask :exec
DELETE FROM task_failures WHERE task_id=?1
`
func (q *Queries) ClearFailureListOfTask(ctx context.Context, taskID int64) error {
_, err := q.db.ExecContext(ctx, clearFailureListOfTask, taskID)
return err
}
const countWorkersFailingTask = `-- name: CountWorkersFailingTask :one
SELECT count(*) as num_failed FROM task_failures
WHERE task_id=?1
`
// Count how many workers have failed a given task.
func (q *Queries) CountWorkersFailingTask(ctx context.Context, taskID int64) (int64, error) {
row := q.db.QueryRowContext(ctx, countWorkersFailingTask, taskID)
var num_failed int64
err := row.Scan(&num_failed)
return num_failed, err
}
const createJob = `-- name: CreateJob :exec const createJob = `-- name: CreateJob :exec
INSERT INTO jobs ( INSERT INTO jobs (
@ -271,6 +321,56 @@ func (q *Queries) FetchTask(ctx context.Context, uuid string) (FetchTaskRow, err
return i, err return i, err
} }
const fetchTaskFailureList = `-- name: FetchTaskFailureList :many
SELECT workers.id, workers.created_at, workers.updated_at, workers.uuid, workers.secret, workers.name, workers.address, workers.platform, workers.software, workers.status, workers.last_seen_at, workers.status_requested, workers.lazy_status_request, workers.supported_task_types, workers.deleted_at, workers.can_restart FROM workers
INNER JOIN task_failures TF on TF.worker_id = workers.id
WHERE TF.task_id=?1
`
type FetchTaskFailureListRow struct {
Worker Worker
}
func (q *Queries) FetchTaskFailureList(ctx context.Context, taskID int64) ([]FetchTaskFailureListRow, error) {
rows, err := q.db.QueryContext(ctx, fetchTaskFailureList, taskID)
if err != nil {
return nil, err
}
defer rows.Close()
var items []FetchTaskFailureListRow
for rows.Next() {
var i FetchTaskFailureListRow
if err := rows.Scan(
&i.Worker.ID,
&i.Worker.CreatedAt,
&i.Worker.UpdatedAt,
&i.Worker.UUID,
&i.Worker.Secret,
&i.Worker.Name,
&i.Worker.Address,
&i.Worker.Platform,
&i.Worker.Software,
&i.Worker.Status,
&i.Worker.LastSeenAt,
&i.Worker.StatusRequested,
&i.Worker.LazyStatusRequest,
&i.Worker.SupportedTaskTypes,
&i.Worker.DeletedAt,
&i.Worker.CanRestart,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const fetchTaskJobUUID = `-- name: FetchTaskJobUUID :one const fetchTaskJobUUID = `-- name: FetchTaskJobUUID :one
SELECT jobs.UUID as jobUUID SELECT jobs.UUID as jobUUID
FROM tasks FROM tasks