WIP: convert GORM to sqlc, for jobs/tasks #104304
@ -756,39 +756,59 @@ func (db *DB) CountTasksOfJobInStatus(
|
||||
|
||||
// FetchTaskIDsOfJob returns all tasks of the given job.
|
||||
func (db *DB) FetchTasksOfJob(ctx context.Context, job *Job) ([]*Task, error) {
|
||||
var tasks []*Task
|
||||
tx := db.gormDB.WithContext(ctx).
|
||||
Model(&Task{}).
|
||||
Where("job_id", job.ID).
|
||||
Scan(&tasks)
|
||||
if tx.Error != nil {
|
||||
return nil, taskError(tx.Error, "fetching tasks of job %s", job.UUID)
|
||||
queries, err := db.queries()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i := range tasks {
|
||||
tasks[i].Job = job
|
||||
rows, err := queries.FetchTasksOfJob(ctx, int64(job.ID))
|
||||
if err != nil {
|
||||
return nil, taskError(err, "fetching tasks of job %s", job.UUID)
|
||||
}
|
||||
|
||||
return tasks, nil
|
||||
result := make([]*Task, len(rows))
|
||||
for i := range rows {
|
||||
gormTask, err := convertSqlcTask(rows[i].Task, job.UUID, rows[i].WorkerUUID.String)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
gormTask.Job = job
|
||||
result[i] = gormTask
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// FetchTasksOfJobInStatus returns those tasks of the given job that have any of the given statuses.
|
||||
func (db *DB) FetchTasksOfJobInStatus(ctx context.Context, job *Job, taskStatuses ...api.TaskStatus) ([]*Task, error) {
|
||||
var tasks []*Task
|
||||
tx := db.gormDB.WithContext(ctx).
|
||||
Model(&Task{}).
|
||||
Where("job_id", job.ID).
|
||||
Where("status in ?", taskStatuses).
|
||||
Scan(&tasks)
|
||||
if tx.Error != nil {
|
||||
return nil, taskError(tx.Error, "fetching tasks of job %s in status %q", job.UUID, taskStatuses)
|
||||
queries, err := db.queries()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i := range tasks {
|
||||
tasks[i].Job = job
|
||||
// Convert from []api.TaskStatus to []string for feeding to sqlc.
|
||||
statusesAsStrings := make([]string, len(taskStatuses))
|
||||
for index := range taskStatuses {
|
||||
statusesAsStrings[index] = string(taskStatuses[index])
|
||||
}
|
||||
|
||||
return tasks, nil
|
||||
rows, err := queries.FetchTasksOfJobInStatus(ctx, sqlc.FetchTasksOfJobInStatusParams{
|
||||
JobID: int64(job.ID),
|
||||
TaskStatus: statusesAsStrings,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, taskError(err, "fetching tasks of job %s in status %q", job.UUID, taskStatuses)
|
||||
}
|
||||
|
||||
result := make([]*Task, len(rows))
|
||||
for i := range rows {
|
||||
gormTask, err := convertSqlcTask(rows[i].Task, job.UUID, rows[i].WorkerUUID.String)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
gormTask.Job = job
|
||||
result[i] = gormTask
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// UpdateJobsTaskStatuses updates the status & activity of all tasks of `job`.
|
||||
|
@ -83,6 +83,19 @@ WHERE tasks.worker_id = @worker_id
|
||||
AND tasks.job_id = @job_id
|
||||
AND tasks.status = @task_status;
|
||||
|
||||
-- name: FetchTasksOfJob :many
|
||||
SELECT sqlc.embed(tasks), workers.UUID as workerUUID
|
||||
FROM tasks
|
||||
LEFT JOIN workers ON (tasks.worker_id = workers.id)
|
||||
WHERE tasks.job_id = @job_id;
|
||||
|
||||
-- name: FetchTasksOfJobInStatus :many
|
||||
SELECT sqlc.embed(tasks), workers.UUID as workerUUID
|
||||
FROM tasks
|
||||
LEFT JOIN workers ON (tasks.worker_id = workers.id)
|
||||
WHERE tasks.job_id = @job_id
|
||||
AND tasks.status in (sqlc.slice('task_status'));
|
||||
|
||||
-- name: FetchTaskJobUUID :one
|
||||
SELECT jobs.UUID as jobUUID
|
||||
FROM tasks
|
||||
|
@ -285,6 +285,123 @@ func (q *Queries) FetchTaskJobUUID(ctx context.Context, uuid string) (sql.NullSt
|
||||
return jobuuid, err
|
||||
}
|
||||
|
||||
const fetchTasksOfJob = `-- name: FetchTasksOfJob :many
|
||||
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity, workers.UUID as workerUUID
|
||||
FROM tasks
|
||||
LEFT JOIN workers ON (tasks.worker_id = workers.id)
|
||||
WHERE tasks.job_id = ?1
|
||||
`
|
||||
|
||||
type FetchTasksOfJobRow struct {
|
||||
Task Task
|
||||
WorkerUUID sql.NullString
|
||||
}
|
||||
|
||||
func (q *Queries) FetchTasksOfJob(ctx context.Context, jobID int64) ([]FetchTasksOfJobRow, error) {
|
||||
rows, err := q.db.QueryContext(ctx, fetchTasksOfJob, jobID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []FetchTasksOfJobRow
|
||||
for rows.Next() {
|
||||
var i FetchTasksOfJobRow
|
||||
if err := rows.Scan(
|
||||
&i.Task.ID,
|
||||
&i.Task.CreatedAt,
|
||||
&i.Task.UpdatedAt,
|
||||
&i.Task.UUID,
|
||||
&i.Task.Name,
|
||||
&i.Task.Type,
|
||||
&i.Task.JobID,
|
||||
&i.Task.Priority,
|
||||
&i.Task.Status,
|
||||
&i.Task.WorkerID,
|
||||
&i.Task.LastTouchedAt,
|
||||
&i.Task.Commands,
|
||||
&i.Task.Activity,
|
||||
&i.WorkerUUID,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const fetchTasksOfJobInStatus = `-- name: FetchTasksOfJobInStatus :many
|
||||
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity, workers.UUID as workerUUID
|
||||
FROM tasks
|
||||
LEFT JOIN workers ON (tasks.worker_id = workers.id)
|
||||
WHERE tasks.job_id = ?1
|
||||
AND tasks.status in (/*SLICE:task_status*/?)
|
||||
`
|
||||
|
||||
type FetchTasksOfJobInStatusParams struct {
|
||||
JobID int64
|
||||
TaskStatus []string
|
||||
}
|
||||
|
||||
type FetchTasksOfJobInStatusRow struct {
|
||||
Task Task
|
||||
WorkerUUID sql.NullString
|
||||
}
|
||||
|
||||
func (q *Queries) FetchTasksOfJobInStatus(ctx context.Context, arg FetchTasksOfJobInStatusParams) ([]FetchTasksOfJobInStatusRow, error) {
|
||||
query := fetchTasksOfJobInStatus
|
||||
var queryParams []interface{}
|
||||
queryParams = append(queryParams, arg.JobID)
|
||||
if len(arg.TaskStatus) > 0 {
|
||||
for _, v := range arg.TaskStatus {
|
||||
queryParams = append(queryParams, v)
|
||||
}
|
||||
query = strings.Replace(query, "/*SLICE:task_status*/?", strings.Repeat(",?", len(arg.TaskStatus))[1:], 1)
|
||||
} else {
|
||||
query = strings.Replace(query, "/*SLICE:task_status*/?", "NULL", 1)
|
||||
}
|
||||
rows, err := q.db.QueryContext(ctx, query, queryParams...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []FetchTasksOfJobInStatusRow
|
||||
for rows.Next() {
|
||||
var i FetchTasksOfJobInStatusRow
|
||||
if err := rows.Scan(
|
||||
&i.Task.ID,
|
||||
&i.Task.CreatedAt,
|
||||
&i.Task.UpdatedAt,
|
||||
&i.Task.UUID,
|
||||
&i.Task.Name,
|
||||
&i.Task.Type,
|
||||
&i.Task.JobID,
|
||||
&i.Task.Priority,
|
||||
&i.Task.Status,
|
||||
&i.Task.WorkerID,
|
||||
&i.Task.LastTouchedAt,
|
||||
&i.Task.Commands,
|
||||
&i.Task.Activity,
|
||||
&i.WorkerUUID,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const fetchTasksOfWorkerInStatus = `-- name: FetchTasksOfWorkerInStatus :many
|
||||
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity, jobs.UUID as jobUUID
|
||||
FROM tasks
|
||||
|
Loading…
Reference in New Issue
Block a user