WIP: convert GORM to sqlc, for jobs/tasks #104304
@ -184,7 +184,9 @@ func (db *DB) queries() (*sqlc.Queries, error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get low-level database driver: %w", err)
|
||||
}
|
||||
return sqlc.New(sqldb), nil
|
||||
|
||||
loggingWrapper := LoggingDBConn{sqldb}
|
||||
return sqlc.New(&loggingWrapper), nil
|
||||
}
|
||||
|
||||
// now returns the result of `nowFunc()` wrapped in a sql.NullTime.
|
||||
|
@ -2,6 +2,7 @@
|
||||
package persistence
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
@ -9,6 +10,7 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
// TODO: let these errors wrap database/sql.ErrNoRows.
|
||||
ErrJobNotFound = PersistenceError{Message: "job not found", Err: gorm.ErrRecordNotFound}
|
||||
ErrTaskNotFound = PersistenceError{Message: "task not found", Err: gorm.ErrRecordNotFound}
|
||||
ErrWorkerNotFound = PersistenceError{Message: "worker not found", Err: gorm.ErrRecordNotFound}
|
||||
@ -63,36 +65,48 @@ func wrapError(errorToWrap error, message string, format ...interface{}) error {
|
||||
|
||||
// translateGormJobError translates a Gorm error to a persistence layer error.
|
||||
// This helps to keep Gorm as "implementation detail" of the persistence layer.
|
||||
func translateGormJobError(gormError error) error {
|
||||
if errors.Is(gormError, gorm.ErrRecordNotFound) {
|
||||
func translateGormJobError(err error) error {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return ErrTaskNotFound
|
||||
}
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return ErrJobNotFound
|
||||
}
|
||||
return gormError
|
||||
return err
|
||||
}
|
||||
|
||||
// translateGormTaskError translates a Gorm error to a persistence layer error.
|
||||
// This helps to keep Gorm as "implementation detail" of the persistence layer.
|
||||
func translateGormTaskError(gormError error) error {
|
||||
if errors.Is(gormError, gorm.ErrRecordNotFound) {
|
||||
func translateGormTaskError(err error) error {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return ErrTaskNotFound
|
||||
}
|
||||
return gormError
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return ErrTaskNotFound
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// translateGormWorkerError translates a Gorm error to a persistence layer error.
|
||||
// This helps to keep Gorm as "implementation detail" of the persistence layer.
|
||||
func translateGormWorkerError(gormError error) error {
|
||||
if errors.Is(gormError, gorm.ErrRecordNotFound) {
|
||||
func translateGormWorkerError(err error) error {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return ErrWorkerNotFound
|
||||
}
|
||||
return gormError
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return ErrWorkerNotFound
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// translateGormWorkerTagError translates a Gorm error to a persistence layer error.
|
||||
// This helps to keep Gorm as "implementation detail" of the persistence layer.
|
||||
func translateGormWorkerTagError(gormError error) error {
|
||||
if errors.Is(gormError, gorm.ErrRecordNotFound) {
|
||||
func translateGormWorkerTagError(err error) error {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return ErrWorkerTagNotFound
|
||||
}
|
||||
return gormError
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return ErrWorkerTagNotFound
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -477,11 +477,8 @@ func (db *DB) FetchTask(ctx context.Context, taskUUID string) (*Task, error) {
|
||||
if err != nil {
|
||||
return nil, taskError(err, "fetching worker assigned to task %s", taskUUID)
|
||||
}
|
||||
convertedWorker, err := convertSqlcWorker(worker)
|
||||
if err != nil {
|
||||
return nil, taskError(err, "converting worker assigned to task %s", taskUUID)
|
||||
}
|
||||
convertedTask.Worker = convertedWorker
|
||||
convertedWorker := convertSqlcWorker(worker)
|
||||
convertedTask.Worker = &convertedWorker
|
||||
}
|
||||
|
||||
return convertedTask, nil
|
||||
|
@ -4,13 +4,16 @@ package persistence
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
"gorm.io/gorm"
|
||||
gormlogger "gorm.io/gorm/logger"
|
||||
"projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc"
|
||||
)
|
||||
|
||||
// dbLogger implements the behaviour of Gorm's default logger on top of Zerolog.
|
||||
@ -126,3 +129,28 @@ func (l dbLogger) logger(args ...interface{}) zerolog.Logger {
|
||||
}
|
||||
return logCtx.Logger()
|
||||
}
|
||||
|
||||
// LoggingDBConn wraps a database/sql.DB connection, so that it can be used with
|
||||
// sqlc and log all the queries.
|
||||
type LoggingDBConn struct {
|
||||
wrappedConn sqlc.DBTX
|
||||
}
|
||||
|
||||
var _ sqlc.DBTX = (*LoggingDBConn)(nil)
|
||||
|
||||
func (ldbc *LoggingDBConn) ExecContext(ctx context.Context, sql string, args ...interface{}) (sql.Result, error) {
|
||||
log.Trace().Str("sql", sql).Interface("args", args).Msg("database: query Exec")
|
||||
return ldbc.wrappedConn.ExecContext(ctx, sql, args...)
|
||||
}
|
||||
func (ldbc *LoggingDBConn) PrepareContext(ctx context.Context, sql string) (*sql.Stmt, error) {
|
||||
log.Trace().Str("sql", sql).Msg("database: query Prepare")
|
||||
return ldbc.wrappedConn.PrepareContext(ctx, sql)
|
||||
}
|
||||
func (ldbc *LoggingDBConn) QueryContext(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error) {
|
||||
log.Trace().Str("sql", sql).Interface("args", args).Msg("database: query Query")
|
||||
return ldbc.wrappedConn.QueryContext(ctx, sql, args...)
|
||||
}
|
||||
func (ldbc *LoggingDBConn) QueryRowContext(ctx context.Context, sql string, args ...interface{}) *sql.Row {
|
||||
log.Trace().Str("sql", sql).Interface("args", args).Msg("database: query QueryRow")
|
||||
return ldbc.wrappedConn.QueryRowContext(ctx, sql, args...)
|
||||
}
|
||||
|
@ -4,8 +4,15 @@
|
||||
|
||||
-- name: FetchWorker :one
|
||||
-- FetchWorker only returns the worker if it wasn't soft-deleted.
|
||||
SELECT * FROM workers WHERE workers.uuid = @uuid and deleted_at is not NULL;
|
||||
SELECT * FROM workers WHERE workers.uuid = @uuid and deleted_at is NULL;
|
||||
|
||||
-- name: FetchWorkerUnconditional :one
|
||||
-- FetchWorkerUnconditional ignores soft-deletion status and just returns the worker.
|
||||
SELECT * FROM workers WHERE workers.uuid = @uuid;
|
||||
|
||||
-- name: FetchWorkerTags :many
|
||||
SELECT worker_tags.*
|
||||
FROM workers
|
||||
LEFT JOIN worker_tag_membership memb ON (memb.worker_id = workers.id)
|
||||
LEFT JOIN worker_tags ON (memb.worker_tag_id = worker_tags.id)
|
||||
WHERE workers.uuid = @uuid;
|
||||
|
@ -7,11 +7,12 @@ package sqlc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
)
|
||||
|
||||
const fetchWorker = `-- name: FetchWorker :one
|
||||
|
||||
SELECT id, created_at, updated_at, uuid, secret, name, address, platform, software, status, last_seen_at, status_requested, lazy_status_request, supported_task_types, deleted_at, can_restart FROM workers WHERE workers.uuid = ?1 and deleted_at is not NULL
|
||||
SELECT id, created_at, updated_at, uuid, secret, name, address, platform, software, status, last_seen_at, status_requested, lazy_status_request, supported_task_types, deleted_at, can_restart FROM workers WHERE workers.uuid = ?1 and deleted_at is NULL
|
||||
`
|
||||
|
||||
// Worker queries
|
||||
@ -41,6 +42,53 @@ func (q *Queries) FetchWorker(ctx context.Context, uuid string) (Worker, error)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const fetchWorkerTags = `-- name: FetchWorkerTags :many
|
||||
SELECT worker_tags.id, worker_tags.created_at, worker_tags.updated_at, worker_tags.uuid, worker_tags.name, worker_tags.description
|
||||
FROM workers
|
||||
LEFT JOIN worker_tag_membership memb ON (memb.worker_id = workers.id)
|
||||
LEFT JOIN worker_tags ON (memb.worker_tag_id = worker_tags.id)
|
||||
WHERE workers.uuid = ?1
|
||||
`
|
||||
|
||||
type FetchWorkerTagsRow struct {
|
||||
ID sql.NullInt64
|
||||
CreatedAt sql.NullTime
|
||||
UpdatedAt sql.NullTime
|
||||
UUID sql.NullString
|
||||
Name sql.NullString
|
||||
Description sql.NullString
|
||||
}
|
||||
|
||||
func (q *Queries) FetchWorkerTags(ctx context.Context, uuid string) ([]FetchWorkerTagsRow, error) {
|
||||
rows, err := q.db.QueryContext(ctx, fetchWorkerTags, uuid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []FetchWorkerTagsRow
|
||||
for rows.Next() {
|
||||
var i FetchWorkerTagsRow
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
&i.UUID,
|
||||
&i.Name,
|
||||
&i.Description,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const fetchWorkerUnconditional = `-- name: FetchWorkerUnconditional :one
|
||||
SELECT id, created_at, updated_at, uuid, secret, name, address, platform, software, status, last_seen_at, status_requested, lazy_status_request, supported_task_types, deleted_at, can_restart FROM workers WHERE workers.uuid = ?1
|
||||
`
|
||||
|
@ -15,13 +15,13 @@ import (
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/stretchr/testify/require"
|
||||
"gorm.io/gorm"
|
||||
"projects.blender.org/studio/flamenco/internal/uuid"
|
||||
"projects.blender.org/studio/flamenco/pkg/api"
|
||||
)
|
||||
|
||||
// Change this to a filename if you want to run a single test and inspect the
|
||||
// resulting database.
|
||||
const TestDSN = "file::memory:"
|
||||
// const TestDSN = "file::memory:"
|
||||
const TestDSN = "C:/workspace/flamenco/unittest.sqlite"
|
||||
|
||||
func CreateTestDB(t *testing.T) (db *DB, closer func()) {
|
||||
// Delete the SQLite file if it exists on disk.
|
||||
@ -106,7 +106,7 @@ func workerTestFixtures(t *testing.T, testContextTimeout time.Duration) WorkerTe
|
||||
ctx, cancel, db := persistenceTestFixtures(t, testContextTimeout)
|
||||
|
||||
w := Worker{
|
||||
UUID: uuid.New(),
|
||||
UUID: "557930e7-5b55-469e-a6d7-fc800f3685be",
|
||||
Name: "дрон",
|
||||
Address: "fe80::5054:ff:fede:2ad7",
|
||||
Platform: "linux",
|
||||
@ -116,7 +116,7 @@ func workerTestFixtures(t *testing.T, testContextTimeout time.Duration) WorkerTe
|
||||
}
|
||||
|
||||
wc := WorkerTag{
|
||||
UUID: uuid.New(),
|
||||
UUID: "e0e05417-9793-4829-b1d0-d446dd819f3d",
|
||||
Name: "arbejdsklynge",
|
||||
Description: "Worker tag in Danish",
|
||||
}
|
||||
|
@ -84,7 +84,21 @@ func (db *DB) FetchWorker(ctx context.Context, uuid string) (*Worker, error) {
|
||||
return nil, workerError(err, "fetching worker %s", uuid)
|
||||
}
|
||||
|
||||
return convertSqlcWorker(worker)
|
||||
// TODO: remove this code, and let the caller fetch the tags when interested in them.
|
||||
workerTags, err := queries.FetchWorkerTags(ctx, uuid)
|
||||
if err != nil {
|
||||
return nil, workerTagError(err, "fetching tags of worker %s", uuid)
|
||||
}
|
||||
|
||||
convertedWorker := convertSqlcWorker(worker)
|
||||
convertedWorker.Tags = make([]*WorkerTag, len(workerTags))
|
||||
for index := range workerTags {
|
||||
tag := workerTags[index].WorkerTag
|
||||
convertedTag := convertSqlcWorkerTag(tag)
|
||||
convertedWorker.Tags[index] = &convertedTag
|
||||
}
|
||||
|
||||
return &convertedWorker, nil
|
||||
}
|
||||
|
||||
func (db *DB) DeleteWorker(ctx context.Context, uuid string) error {
|
||||
@ -212,8 +226,8 @@ func (db *DB) SummarizeWorkerStatuses(ctx context.Context) (WorkerStatusCount, e
|
||||
// expected by the rest of the code. This is mostly in place to aid in the GORM
|
||||
// to SQLC migration. It is intended that eventually the rest of the code will
|
||||
// use the same SQLC-generated model.
|
||||
func convertSqlcWorker(worker sqlc.Worker) (*Worker, error) {
|
||||
dbWorker := Worker{
|
||||
func convertSqlcWorker(worker sqlc.Worker) Worker {
|
||||
return Worker{
|
||||
Model: Model{
|
||||
ID: uint(worker.ID),
|
||||
CreatedAt: worker.CreatedAt,
|
||||
@ -233,8 +247,22 @@ func convertSqlcWorker(worker sqlc.Worker) (*Worker, error) {
|
||||
StatusRequested: api.WorkerStatus(worker.StatusRequested),
|
||||
LazyStatusRequest: worker.LazyStatusRequest != 0,
|
||||
SupportedTaskTypes: worker.SupportedTaskTypes,
|
||||
// TODO: Tags []*WorkerTag `gorm:"many2many:worker_tag_membership;constraint:OnDelete:CASCADE"`
|
||||
}
|
||||
|
||||
return &dbWorker, nil
|
||||
}
|
||||
|
||||
// convertSqlcWorkerTag converts a worker tag from the SQLC-generated model to
|
||||
// the model expected by the rest of the code. This is mostly in place to aid in
|
||||
// the GORM to SQLC migration. It is intended that eventually the rest of the
|
||||
// code will use the same SQLC-generated model.
|
||||
func convertSqlcWorkerTag(tag sqlc.WorkerTag) WorkerTag {
|
||||
return WorkerTag{
|
||||
Model: Model{
|
||||
ID: uint(tag.ID),
|
||||
CreatedAt: tag.CreatedAt,
|
||||
UpdatedAt: tag.UpdatedAt.Time,
|
||||
},
|
||||
UUID: tag.UUID,
|
||||
Name: tag.Name,
|
||||
Description: tag.Description,
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user