flamenco/internal/manager/job_deleter/job_deleter.go
Sybren A. Stüvel b39f116b0e Manager: after deleting a job, perform a database consistency check
Deleting jobs from the database can still sometimes cause consistency
errors, as if foreign key constraints aren't enabled. This check is there
to try and get a grip on things.
2024-01-11 20:03:53 +01:00

284 lines
8.8 KiB
Go

// package job_deleter has functionality to delete jobs.
//
// Requesting deletion marks the job as "deletion requested" in the database.
// This is relatively fast, and persistent. After this, the job is queued for
// actual deletion by a different goroutine.
//
// At startup of the service the database is inspected and still-pending
// deletion requests are queued.
//
// SPDX-License-Identifier: GPL-3.0-or-later
package job_deleter
import (
"context"
"errors"
"fmt"
"time"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"projects.blender.org/studio/flamenco/internal/manager/persistence"
"projects.blender.org/studio/flamenco/internal/manager/webupdates"
"projects.blender.org/studio/flamenco/pkg/api"
"projects.blender.org/studio/flamenco/pkg/shaman"
)
// jobDeletionQueueSize determines how many job deletion requests can be kept in
// memory at a time. This is variable to allow unit testing with lower limits.
var jobDeletionQueueSize = defaultJobDeletionQueueSize
const (
defaultJobDeletionQueueSize = 100
// jobDeletionCheckInterval determines how often the database is checked for
// jobs that have been requested to be deleted.
jobDeletionCheckInterval = 1 * time.Minute
)
// Service can mark jobs as "deletion requested", as well as delete those jobs
// in a background goroutine.
type Service struct {
// Injected dependencies.
persist PersistenceService
storage Storage
changeBroadcaster ChangeBroadcaster
shaman Shaman
queue chan string // Job UUIDs to process.
}
// NewService constructs a new job deletion service.
// `shaman` can be nil if Shaman checkouts shouldn't be erased.
func NewService(
persist PersistenceService,
storage Storage,
changeBroadcaster ChangeBroadcaster,
shaman Shaman,
) *Service {
return &Service{
persist: persist,
storage: storage,
changeBroadcaster: changeBroadcaster,
shaman: shaman,
queue: make(chan string, jobDeletionQueueSize),
}
}
func (s *Service) QueueJobDeletion(ctx context.Context, job *persistence.Job) error {
logger := log.With().Str("job", job.UUID).Logger()
logger.Info().Msg("job deleter: queueing job for deletion")
err := s.persist.RequestJobDeletion(ctx, job)
if err != nil {
return fmt.Errorf("requesting job deletion: %w", err)
}
// Broadcast that this job was queued for deleted.
jobUpdate := webupdates.NewJobUpdate(job)
s.changeBroadcaster.BroadcastJobUpdate(jobUpdate)
// Let the Run() goroutine know this job is ready for deletion.
select {
case s.queue <- job.UUID:
logger.Debug().Msg("job deleter: job succesfully queued for deletion")
case <-time.After(100 * time.Millisecond):
logger.Debug().Msg("job deleter: job deletion queue is full")
}
return nil
}
func (s *Service) QueueMassJobDeletion(ctx context.Context, lastUpdatedMax time.Time) error {
logger := log.With().Time("lastUpdatedMax", lastUpdatedMax).Logger()
uuids, err := s.persist.RequestJobMassDeletion(ctx, lastUpdatedMax)
if err != nil {
return fmt.Errorf("requesting mass job deletion: %w", err)
}
logger.Info().
Int("numjobs", len(uuids)).
Msg("job deleter: queueing multiple jobs for deletion")
// Do the poking of the job deleter, and broadcasting of the job deletion, in
// the background. The main work is done, and the rest can be done asynchronously.
bgCtx := context.Background()
go s.broadcastAndQueueMassJobDeletion(bgCtx, uuids, logger)
return nil
}
func (s *Service) broadcastAndQueueMassJobDeletion(ctx context.Context, jobUUIDs []string, logger zerolog.Logger) {
for _, uuid := range jobUUIDs {
// Let the Run() goroutine know this job is ready for deletion.
select {
case s.queue <- uuid:
logger.Debug().Msg("job deleter: job succesfully queued for deletion")
case <-time.After(100 * time.Millisecond):
logger.Trace().Msg("job deleter: job deletion queue is full")
}
// Broadcast that the jobs were queued for deletion.
job, err := s.persist.FetchJob(ctx, uuid)
if err != nil {
logger.Debug().
Str("uuid", uuid).
Err(err).
Msg("job deleter: unable to fetch job to send updates")
continue
}
jobUpdate := webupdates.NewJobUpdate(job)
s.changeBroadcaster.BroadcastJobUpdate(jobUpdate)
}
}
func (s *Service) WhatWouldBeDeleted(job *persistence.Job) api.JobDeletionInfo {
logger := log.With().Str("job", job.UUID).Logger()
logger.Info().Msg("job deleter: checking what job deletion would do")
return api.JobDeletionInfo{
ShamanCheckout: s.canDeleteShamanCheckout(logger, job),
}
}
// Run processes the queue of deletion requests. It starts by building up a
// queue of still-pending job deletions.
func (s *Service) Run(ctx context.Context) {
s.queuePendingDeletions(ctx)
log.Debug().Msg("job deleter: running")
defer log.Debug().Msg("job deleter: shutting down")
for {
select {
case <-ctx.Done():
return
case jobUUID := <-s.queue:
s.deleteJob(ctx, jobUUID)
case <-time.After(jobDeletionCheckInterval):
// Inspect the database to see if there was anything marked for deletion
// without getting into our queue. This can happen when lots of jobs are
// queued in quick succession, as then the queue channel gets full.
if len(s.queue) == 0 {
s.queuePendingDeletions(ctx)
}
}
}
}
func (s *Service) queuePendingDeletions(ctx context.Context) {
log.Debug().Msg("job deleter: finding pending deletions")
jobUUIDs, err := s.persist.FetchJobsDeletionRequested(ctx)
if err != nil {
log.Warn().AnErr("cause", err).Msg("job deleter: could not find jobs to be deleted in database")
return
}
numDeletionsQueued := len(jobUUIDs)
queueLoop:
for index, jobUUID := range jobUUIDs {
select {
case s.queue <- jobUUID:
log.Debug().Str("job", jobUUID).Msg("job deleter: job queued for deletion")
case <-time.After(100 * time.Millisecond):
numRemaining := numDeletionsQueued - index
log.Info().
Int("deletionsQueued", len(s.queue)).
Int("deletionsRemaining", numRemaining).
Stringer("checkInterval", jobDeletionCheckInterval).
Msg("job deleter: job deletion queue is full, remaining deletions will be picked up later")
break queueLoop
}
}
}
func (s *Service) deleteJob(ctx context.Context, jobUUID string) error {
logger := log.With().Str("job", jobUUID).Logger()
err := s.deleteShamanCheckout(ctx, logger, jobUUID)
if err != nil {
return err
}
logger.Debug().Msg("job deleter: removing logs, last-rendered images, etc.")
if err := s.storage.RemoveJobStorage(ctx, jobUUID); err != nil {
logger.Error().Err(err).Msg("job deleter: error removing job logs, job deletion aborted")
return err
}
logger.Debug().Msg("job deleter: removing job from database")
if err := s.persist.DeleteJob(ctx, jobUUID); err != nil {
logger.Error().Err(err).Msg("job deleter: unable to remove job from database")
return err
}
// Broadcast that this job was deleted. This only contains the UUID and the
// "was deleted" flag, because there's nothing else left. And I don't want to
// do a full database query for something we'll delete anyway.
wasDeleted := true
jobUpdate := api.SocketIOJobUpdate{
Id: jobUUID,
WasDeleted: &wasDeleted,
}
s.changeBroadcaster.BroadcastJobUpdate(jobUpdate)
logger.Info().Msg("job deleter: job removal complete")
// Request a consistency check on the database. In the past there have been
// some issues after deleting a job.
s.persist.RequestIntegrityCheck()
return nil
}
func (s *Service) canDeleteShamanCheckout(logger zerolog.Logger, job *persistence.Job) bool {
// NOTE: Keep this logic and the deleteShamanCheckout() function in sync.
if !s.shaman.IsEnabled() {
logger.Debug().Msg("job deleter: Shaman not enabled, cannot delete job files")
return false
}
checkoutID := job.Storage.ShamanCheckoutID
if checkoutID == "" {
logger.Debug().Msg("job deleter: job was not created with Shaman (or before Flamenco v3.2), cannot delete job files")
return false
}
return true
}
func (s *Service) deleteShamanCheckout(ctx context.Context, logger zerolog.Logger, jobUUID string) error {
// NOTE: Keep this logic and the canDeleteShamanCheckout() function in sync.
if !s.shaman.IsEnabled() {
logger.Debug().Msg("job deleter: Shaman not enabled, skipping job file deletion")
return nil
}
// To erase the Shaman checkout we need more info than just its UUID.
dbJob, err := s.persist.FetchJob(ctx, jobUUID)
if err != nil {
return fmt.Errorf("unable to fetch job from database: %w", err)
}
checkoutID := dbJob.Storage.ShamanCheckoutID
if checkoutID == "" {
logger.Info().Msg("job deleter: job was not created with Shaman (or before Flamenco v3.2), skipping job file deletion")
return nil
}
err = s.shaman.EraseCheckout(checkoutID)
switch {
case errors.Is(err, shaman.ErrDoesNotExist):
logger.Info().Msg("job deleter: Shaman checkout directory does not exist, ignoring")
return nil
case err != nil:
logger.Info().Err(err).Msg("job deleter: Shaman checkout directory could not be erased")
return err
}
return nil
}