WIP: convert GORM to sqlc, for jobs/tasks #104304
@ -697,38 +697,44 @@ func (db *DB) FetchTasksOfWorkerInStatusOfJob(ctx context.Context, worker *Worke
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) JobHasTasksInStatus(ctx context.Context, job *Job, taskStatus api.TaskStatus) (bool, error) {
|
func (db *DB) JobHasTasksInStatus(ctx context.Context, job *Job, taskStatus api.TaskStatus) (bool, error) {
|
||||||
var numTasksInStatus int64
|
queries, err := db.queries()
|
||||||
tx := db.gormDB.WithContext(ctx).
|
if err != nil {
|
||||||
Model(&Task{}).
|
return false, err
|
||||||
Where("job_id", job.ID).
|
|
||||||
Where("status", taskStatus).
|
|
||||||
Count(&numTasksInStatus)
|
|
||||||
if tx.Error != nil {
|
|
||||||
return false, taskError(tx.Error, "counting tasks of job %s in status %q", job.UUID, taskStatus)
|
|
||||||
}
|
}
|
||||||
return numTasksInStatus > 0, nil
|
|
||||||
|
count, err := queries.JobCountTasksInStatus(ctx, sqlc.JobCountTasksInStatusParams{
|
||||||
|
JobID: int64(job.ID),
|
||||||
|
TaskStatus: string(taskStatus),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return false, taskError(err, "counting tasks of job %s in status %q", job.UUID, taskStatus)
|
||||||
|
}
|
||||||
|
|
||||||
|
return count > 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CountTasksOfJobInStatus counts the number of tasks in the job.
|
||||||
|
// It returns two counts, one is the number of tasks in the given statuses, the
|
||||||
|
// other is the total number of tasks of the job.
|
||||||
func (db *DB) CountTasksOfJobInStatus(
|
func (db *DB) CountTasksOfJobInStatus(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
job *Job,
|
job *Job,
|
||||||
taskStatuses ...api.TaskStatus,
|
taskStatuses ...api.TaskStatus,
|
||||||
) (numInStatus, numTotal int, err error) {
|
) (numInStatus, numTotal int, err error) {
|
||||||
type Result struct {
|
queries, err := db.queries()
|
||||||
Status api.TaskStatus
|
if err != nil {
|
||||||
NumTasks int
|
return 0, 0, err
|
||||||
}
|
}
|
||||||
var results []Result
|
|
||||||
|
|
||||||
tx := db.gormDB.WithContext(ctx).
|
// Convert from []api.TaskStatus to []string for feeding to sqlc.
|
||||||
Model(&Task{}).
|
statusesAsStrings := make([]string, len(taskStatuses))
|
||||||
Select("status, count(*) as num_tasks").
|
for index := range taskStatuses {
|
||||||
Where("job_id", job.ID).
|
statusesAsStrings[index] = string(taskStatuses[index])
|
||||||
Group("status").
|
}
|
||||||
Scan(&results)
|
|
||||||
|
|
||||||
if tx.Error != nil {
|
results, err := queries.JobCountTaskStatuses(ctx, int64(job.ID))
|
||||||
return 0, 0, jobError(tx.Error, "count tasks of job %s in status %q", job.UUID, taskStatuses)
|
if err != nil {
|
||||||
|
return 0, 0, jobError(err, "count tasks of job %s in status %q", job.UUID, taskStatuses)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create lookup table for which statuses to count.
|
// Create lookup table for which statuses to count.
|
||||||
@ -739,10 +745,10 @@ func (db *DB) CountTasksOfJobInStatus(
|
|||||||
|
|
||||||
// Count the number of tasks per status.
|
// Count the number of tasks per status.
|
||||||
for _, result := range results {
|
for _, result := range results {
|
||||||
if countStatus[result.Status] {
|
if countStatus[api.TaskStatus(result.Status)] {
|
||||||
numInStatus += result.NumTasks
|
numInStatus += int(result.NumTasks)
|
||||||
}
|
}
|
||||||
numTotal += result.NumTasks
|
numTotal += int(result.NumTasks)
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@ -396,6 +396,12 @@ func TestCountTasksOfJobInStatus(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Equal(t, 0, numActive)
|
assert.Equal(t, 0, numActive)
|
||||||
assert.Equal(t, 3, numTotal)
|
assert.Equal(t, 3, numTotal)
|
||||||
|
|
||||||
|
numCounted, numTotal, err := db.CountTasksOfJobInStatus(ctx, job,
|
||||||
|
api.TaskStatusFailed, api.TaskStatusQueued)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, 3, numCounted)
|
||||||
|
assert.Equal(t, 3, numTotal)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCheckIfJobsHoldLargeNumOfTasks(t *testing.T) {
|
func TestCheckIfJobsHoldLargeNumOfTasks(t *testing.T) {
|
||||||
|
@ -120,3 +120,14 @@ UPDATE tasks SET
|
|||||||
updated_at = @updated_at,
|
updated_at = @updated_at,
|
||||||
worker_id = @worker_id
|
worker_id = @worker_id
|
||||||
WHERE id=@id;
|
WHERE id=@id;
|
||||||
|
|
||||||
|
-- name: JobCountTasksInStatus :one
|
||||||
|
-- Fetch number of tasks in the given status, of the given job.
|
||||||
|
SELECT count(*) as num_tasks FROM tasks
|
||||||
|
WHERE job_id = @job_id AND status = @task_status;
|
||||||
|
|
||||||
|
-- name: JobCountTaskStatuses :many
|
||||||
|
-- Fetch (status, num tasks in that status) rows for the given job.
|
||||||
|
SELECT status, count(*) as num_tasks FROM tasks
|
||||||
|
WHERE job_id = @job_id
|
||||||
|
GROUP BY status;
|
||||||
|
@ -396,6 +396,59 @@ func (q *Queries) FetchTasksOfWorkerInStatusOfJob(ctx context.Context, arg Fetch
|
|||||||
return items, nil
|
return items, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const jobCountTaskStatuses = `-- name: JobCountTaskStatuses :many
|
||||||
|
SELECT status, count(*) as num_tasks FROM tasks
|
||||||
|
WHERE job_id = ?1
|
||||||
|
GROUP BY status
|
||||||
|
`
|
||||||
|
|
||||||
|
type JobCountTaskStatusesRow struct {
|
||||||
|
Status string
|
||||||
|
NumTasks int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch (status, num tasks in that status) rows for the given job.
|
||||||
|
func (q *Queries) JobCountTaskStatuses(ctx context.Context, jobID int64) ([]JobCountTaskStatusesRow, error) {
|
||||||
|
rows, err := q.db.QueryContext(ctx, jobCountTaskStatuses, jobID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
var items []JobCountTaskStatusesRow
|
||||||
|
for rows.Next() {
|
||||||
|
var i JobCountTaskStatusesRow
|
||||||
|
if err := rows.Scan(&i.Status, &i.NumTasks); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
items = append(items, i)
|
||||||
|
}
|
||||||
|
if err := rows.Close(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return items, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
const jobCountTasksInStatus = `-- name: JobCountTasksInStatus :one
|
||||||
|
SELECT count(*) as num_tasks FROM tasks
|
||||||
|
WHERE job_id = ?1 AND status = ?2
|
||||||
|
`
|
||||||
|
|
||||||
|
type JobCountTasksInStatusParams struct {
|
||||||
|
JobID int64
|
||||||
|
TaskStatus string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch number of tasks in the given status, of the given job.
|
||||||
|
func (q *Queries) JobCountTasksInStatus(ctx context.Context, arg JobCountTasksInStatusParams) (int64, error) {
|
||||||
|
row := q.db.QueryRowContext(ctx, jobCountTasksInStatus, arg.JobID, arg.TaskStatus)
|
||||||
|
var num_tasks int64
|
||||||
|
err := row.Scan(&num_tasks)
|
||||||
|
return num_tasks, err
|
||||||
|
}
|
||||||
|
|
||||||
const requestJobDeletion = `-- name: RequestJobDeletion :exec
|
const requestJobDeletion = `-- name: RequestJobDeletion :exec
|
||||||
UPDATE jobs SET
|
UPDATE jobs SET
|
||||||
updated_at = ?1,
|
updated_at = ?1,
|
||||||
|
Loading…
Reference in New Issue
Block a user