Sync branch magefile with main #104308

Merged
Sybren A. Stüvel merged 85 commits from abelli/flamenco:magefile into magefile 2024-05-13 16:26:32 +02:00
5 changed files with 312 additions and 68 deletions
Showing only changes of commit b102b73a1f - Show all commits

View File

@ -5,6 +5,7 @@ package persistence
import (
"context"
"database/sql"
"fmt"
"time"
@ -182,6 +183,14 @@ func (db *DB) queries() (*sqlc.Queries, error) {
return sqlc.New(sqldb), nil
}
// now returns the result of `nowFunc()` wrapped in a sql.NullTime.
func (db *DB) now() sql.NullTime {
return sql.NullTime{
Time: db.gormDB.NowFunc(),
Valid: true,
}
}
func (db *DB) pragmaForeignKeys(enabled bool) error {
var (
value int

View File

@ -300,8 +300,7 @@ func (db *DB) RequestJobDeletion(ctx context.Context, j *Job) error {
}
// Update the given job itself, so we don't have to re-fetch it from the database.
j.DeleteRequestedAt.Time = db.gormDB.NowFunc()
j.DeleteRequestedAt.Valid = true
j.DeleteRequestedAt = db.now()
params := sqlc.RequestJobDeletionParams{
Now: j.DeleteRequestedAt,
@ -321,98 +320,114 @@ func (db *DB) RequestJobDeletion(ctx context.Context, j *Job) error {
// RequestJobMassDeletion sets multiple job's "DeletionRequestedAt" field to "now".
// The list of affected job UUIDs is returned.
func (db *DB) RequestJobMassDeletion(ctx context.Context, lastUpdatedMax time.Time) ([]string, error) {
// In order to be able to report which jobs were affected, first fetch the
// list of jobs, then update them.
var jobs []*Job
selectResult := db.gormDB.WithContext(ctx).
Model(&Job{}).
Select("uuid").
Where("updated_at <= ?", lastUpdatedMax).
Scan(&jobs)
if selectResult.Error != nil {
return nil, jobError(selectResult.Error, "fetching jobs by last-modified timestamp")
queries, err := db.queries()
if err != nil {
return nil, err
}
if len(jobs) == 0 {
// In order to be able to report which jobs were affected, first fetch the
// list of jobs, then update them.
uuids, err := queries.FetchJobUUIDsUpdatedBefore(ctx, sql.NullTime{
Time: lastUpdatedMax,
Valid: true,
})
switch {
case err != nil:
return nil, jobError(err, "fetching jobs by last-modified timestamp")
case len(uuids) == 0:
return nil, ErrJobNotFound
}
// Convert array of jobs to array of UUIDs.
uuids := make([]string, len(jobs))
for index := range jobs {
uuids[index] = jobs[index].UUID
}
// Update the selected jobs.
deleteRequestedAt := sql.NullTime{
Time: db.gormDB.NowFunc(),
Valid: true,
params := sqlc.RequestMassJobDeletionParams{
Now: db.now(),
UUIDs: uuids,
}
tx := db.gormDB.WithContext(ctx).
Model(Job{}).
Where("uuid in ?", uuids).
Updates(Job{DeleteRequestedAt: deleteRequestedAt})
if tx.Error != nil {
return nil, jobError(tx.Error, "queueing jobs for deletion")
if err := queries.RequestMassJobDeletion(ctx, params); err != nil {
return nil, jobError(err, "marking jobs as deletion-requested")
}
return uuids, nil
}
func (db *DB) FetchJobsDeletionRequested(ctx context.Context) ([]string, error) {
var jobs []*Job
tx := db.gormDB.WithContext(ctx).
Model(&Job{}).
Select("UUID").
Where("delete_requested_at is not NULL").
Order("delete_requested_at").
Scan(&jobs)
if tx.Error != nil {
return nil, jobError(tx.Error, "fetching jobs marked for deletion")
queries, err := db.queries()
if err != nil {
return nil, err
}
uuids := make([]string, len(jobs))
for i := range jobs {
uuids[i] = jobs[i].UUID
uuids, err := queries.FetchJobsDeletionRequested(ctx)
if err != nil {
return nil, jobError(err, "fetching jobs marked for deletion")
}
return uuids, nil
}
func (db *DB) FetchJobsInStatus(ctx context.Context, jobStatuses ...api.JobStatus) ([]*Job, error) {
var jobs []*Job
tx := db.gormDB.WithContext(ctx).
Model(&Job{}).
Where("status in ?", jobStatuses).
Scan(&jobs)
if tx.Error != nil {
return nil, jobError(tx.Error, "fetching jobs in status %q", jobStatuses)
queries, err := db.queries()
if err != nil {
return nil, err
}
statuses := []string{}
for _, status := range jobStatuses {
statuses = append(statuses, string(status))
}
sqlcJobs, err := queries.FetchJobsInStatus(ctx, statuses)
if err != nil {
return nil, jobError(err, "fetching jobs in status %q", jobStatuses)
}
var jobs []*Job
for index := range sqlcJobs {
job, err := convertSqlcJob(sqlcJobs[index])
if err != nil {
return nil, jobError(err, "converting fetched jobs in status %q", jobStatuses)
}
jobs = append(jobs, job)
}
return jobs, nil
}
// SaveJobStatus saves the job's Status and Activity fields.
func (db *DB) SaveJobStatus(ctx context.Context, j *Job) error {
tx := db.gormDB.WithContext(ctx).
Model(j).
Updates(Job{Status: j.Status, Activity: j.Activity})
if tx.Error != nil {
return jobError(tx.Error, "saving job status")
queries, err := db.queries()
if err != nil {
return err
}
params := sqlc.SaveJobStatusParams{
Now: db.now(),
ID: int64(j.ID),
Status: string(j.Status),
Activity: j.Activity,
}
err = queries.SaveJobStatus(ctx, params)
if err != nil {
return jobError(err, "saving job status")
}
return nil
}
// SaveJobPriority saves the job's Priority field.
func (db *DB) SaveJobPriority(ctx context.Context, j *Job) error {
tx := db.gormDB.WithContext(ctx).
Model(j).
Updates(Job{Priority: j.Priority})
if tx.Error != nil {
return jobError(tx.Error, "saving job priority")
queries, err := db.queries()
if err != nil {
return err
}
params := sqlc.SaveJobPriorityParams{
Now: db.now(),
ID: int64(j.ID),
Priority: int64(j.Priority),
}
err = queries.SaveJobPriority(ctx, params)
if err != nil {
return jobError(err, "saving job priority")
}
return nil
}
@ -421,12 +436,19 @@ func (db *DB) SaveJobPriority(ctx context.Context, j *Job) error {
// NOTE: this function does NOT update the job's `UpdatedAt` field. This is
// necessary for `cmd/shaman-checkout-id-setter` to do its work quietly.
func (db *DB) SaveJobStorageInfo(ctx context.Context, j *Job) error {
tx := db.gormDB.WithContext(ctx).
Model(j).
Omit("UpdatedAt").
Updates(Job{Storage: j.Storage})
if tx.Error != nil {
return jobError(tx.Error, "saving job storage")
queries, err := db.queries()
if err != nil {
return err
}
params := sqlc.SaveJobStorageInfoParams{
ID: int64(j.ID),
StorageShamanCheckoutID: j.Storage.ShamanCheckoutID,
}
err = queries.SaveJobStorageInfo(ctx, params)
if err != nil {
return jobError(err, "saving job storage")
}
return nil
}

View File

@ -30,3 +30,28 @@ UPDATE jobs SET
delete_requested_at = @now
WHERE id = sqlc.arg('job_id');
-- name: FetchJobUUIDsUpdatedBefore :many
SELECT uuid FROM jobs WHERE updated_at <= @updated_at_max;
-- name: RequestMassJobDeletion :exec
UPDATE jobs SET
updated_at = @now,
delete_requested_at = @now
WHERE uuid in (sqlc.slice('uuids'));
-- name: FetchJobsDeletionRequested :many
SELECT uuid FROM jobs
WHERE delete_requested_at is not NULL
ORDER BY delete_requested_at;
-- name: FetchJobsInStatus :many
SELECT * FROM jobs WHERE status IN (sqlc.slice('statuses'));
-- name: SaveJobStatus :exec
UPDATE jobs SET updated_at=@now, status=@status, activity=@activity WHERE id=@id;
-- name: SaveJobPriority :exec
UPDATE jobs SET updated_at=@now, priority=@priority WHERE id=@id;
-- name: SaveJobStorageInfo :exec
UPDATE jobs SET storage_shaman_checkout_id=@storage_shaman_checkout_id WHERE id=@id;

View File

@ -9,6 +9,7 @@ import (
"context"
"database/sql"
"encoding/json"
"strings"
"time"
)
@ -95,6 +96,114 @@ func (q *Queries) FetchJob(ctx context.Context, uuid string) (Job, error) {
return i, err
}
const fetchJobUUIDsUpdatedBefore = `-- name: FetchJobUUIDsUpdatedBefore :many
SELECT uuid FROM jobs WHERE updated_at <= ?1
`
func (q *Queries) FetchJobUUIDsUpdatedBefore(ctx context.Context, updatedAtMax sql.NullTime) ([]string, error) {
rows, err := q.db.QueryContext(ctx, fetchJobUUIDsUpdatedBefore, updatedAtMax)
if err != nil {
return nil, err
}
defer rows.Close()
var items []string
for rows.Next() {
var uuid string
if err := rows.Scan(&uuid); err != nil {
return nil, err
}
items = append(items, uuid)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const fetchJobsDeletionRequested = `-- name: FetchJobsDeletionRequested :many
SELECT uuid FROM jobs
WHERE delete_requested_at is not NULL
ORDER BY delete_requested_at
`
func (q *Queries) FetchJobsDeletionRequested(ctx context.Context) ([]string, error) {
rows, err := q.db.QueryContext(ctx, fetchJobsDeletionRequested)
if err != nil {
return nil, err
}
defer rows.Close()
var items []string
for rows.Next() {
var uuid string
if err := rows.Scan(&uuid); err != nil {
return nil, err
}
items = append(items, uuid)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const fetchJobsInStatus = `-- name: FetchJobsInStatus :many
SELECT id, created_at, updated_at, uuid, name, job_type, priority, status, activity, settings, metadata, delete_requested_at, storage_shaman_checkout_id, worker_tag_id FROM jobs WHERE status IN (/*SLICE:statuses*/?)
`
func (q *Queries) FetchJobsInStatus(ctx context.Context, statuses []string) ([]Job, error) {
query := fetchJobsInStatus
var queryParams []interface{}
if len(statuses) > 0 {
for _, v := range statuses {
queryParams = append(queryParams, v)
}
query = strings.Replace(query, "/*SLICE:statuses*/?", strings.Repeat(",?", len(statuses))[1:], 1)
} else {
query = strings.Replace(query, "/*SLICE:statuses*/?", "NULL", 1)
}
rows, err := q.db.QueryContext(ctx, query, queryParams...)
if err != nil {
return nil, err
}
defer rows.Close()
var items []Job
for rows.Next() {
var i Job
if err := rows.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.UUID,
&i.Name,
&i.JobType,
&i.Priority,
&i.Status,
&i.Activity,
&i.Settings,
&i.Metadata,
&i.DeleteRequestedAt,
&i.StorageShamanCheckoutID,
&i.WorkerTagID,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const requestJobDeletion = `-- name: RequestJobDeletion :exec
UPDATE jobs SET
updated_at = ?1,
@ -111,3 +220,81 @@ func (q *Queries) RequestJobDeletion(ctx context.Context, arg RequestJobDeletion
_, err := q.db.ExecContext(ctx, requestJobDeletion, arg.Now, arg.JobID)
return err
}
const requestMassJobDeletion = `-- name: RequestMassJobDeletion :exec
UPDATE jobs SET
updated_at = ?1,
delete_requested_at = ?1
WHERE uuid in (/*SLICE:uuids*/?)
`
type RequestMassJobDeletionParams struct {
Now sql.NullTime
UUIDs []string
}
func (q *Queries) RequestMassJobDeletion(ctx context.Context, arg RequestMassJobDeletionParams) error {
query := requestMassJobDeletion
var queryParams []interface{}
queryParams = append(queryParams, arg.Now)
if len(arg.UUIDs) > 0 {
for _, v := range arg.UUIDs {
queryParams = append(queryParams, v)
}
query = strings.Replace(query, "/*SLICE:uuids*/?", strings.Repeat(",?", len(arg.UUIDs))[1:], 1)
} else {
query = strings.Replace(query, "/*SLICE:uuids*/?", "NULL", 1)
}
_, err := q.db.ExecContext(ctx, query, queryParams...)
return err
}
const saveJobPriority = `-- name: SaveJobPriority :exec
UPDATE jobs SET updated_at=?1, priority=?2 WHERE id=?3
`
type SaveJobPriorityParams struct {
Now sql.NullTime
Priority int64
ID int64
}
func (q *Queries) SaveJobPriority(ctx context.Context, arg SaveJobPriorityParams) error {
_, err := q.db.ExecContext(ctx, saveJobPriority, arg.Now, arg.Priority, arg.ID)
return err
}
const saveJobStatus = `-- name: SaveJobStatus :exec
UPDATE jobs SET updated_at=?1, status=?2, activity=?3 WHERE id=?4
`
type SaveJobStatusParams struct {
Now sql.NullTime
Status string
Activity string
ID int64
}
func (q *Queries) SaveJobStatus(ctx context.Context, arg SaveJobStatusParams) error {
_, err := q.db.ExecContext(ctx, saveJobStatus,
arg.Now,
arg.Status,
arg.Activity,
arg.ID,
)
return err
}
const saveJobStorageInfo = `-- name: SaveJobStorageInfo :exec
UPDATE jobs SET storage_shaman_checkout_id=?1 WHERE id=?2
`
type SaveJobStorageInfoParams struct {
StorageShamanCheckoutID string
ID int64
}
func (q *Queries) SaveJobStorageInfo(ctx context.Context, arg SaveJobStorageInfoParams) error {
_, err := q.db.ExecContext(ctx, saveJobStorageInfo, arg.StorageShamanCheckoutID, arg.ID)
return err
}

View File

@ -13,3 +13,4 @@ sql:
type: "RawMessage"
rename:
uuid: "UUID"
uuids: "UUIDs"