Sybren A. Stüvel
b68e51976d
The Manager configuration has an option to disable the UPnP/SSDP server, and now it actually listens to it.
446 lines
13 KiB
Go
446 lines
13 KiB
Go
package main
|
|
|
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"flag"
|
|
"io/fs"
|
|
"net"
|
|
"net/url"
|
|
"os"
|
|
"os/signal"
|
|
"runtime"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/benbjohnson/clock"
|
|
"github.com/mattn/go-colorable"
|
|
"github.com/pkg/browser"
|
|
"github.com/rs/zerolog"
|
|
"github.com/rs/zerolog/log"
|
|
|
|
"projects.blender.org/studio/flamenco/internal/appinfo"
|
|
"projects.blender.org/studio/flamenco/internal/manager/api_impl"
|
|
"projects.blender.org/studio/flamenco/internal/manager/api_impl/dummy"
|
|
"projects.blender.org/studio/flamenco/internal/manager/config"
|
|
"projects.blender.org/studio/flamenco/internal/manager/eventbus"
|
|
"projects.blender.org/studio/flamenco/internal/manager/farmstatus"
|
|
"projects.blender.org/studio/flamenco/internal/manager/job_compilers"
|
|
"projects.blender.org/studio/flamenco/internal/manager/job_deleter"
|
|
"projects.blender.org/studio/flamenco/internal/manager/last_rendered"
|
|
"projects.blender.org/studio/flamenco/internal/manager/local_storage"
|
|
"projects.blender.org/studio/flamenco/internal/manager/persistence"
|
|
"projects.blender.org/studio/flamenco/internal/manager/sleep_scheduler"
|
|
"projects.blender.org/studio/flamenco/internal/manager/task_logs"
|
|
"projects.blender.org/studio/flamenco/internal/manager/task_state_machine"
|
|
"projects.blender.org/studio/flamenco/internal/manager/timeout_checker"
|
|
"projects.blender.org/studio/flamenco/internal/own_url"
|
|
"projects.blender.org/studio/flamenco/internal/upnp_ssdp"
|
|
"projects.blender.org/studio/flamenco/pkg/api"
|
|
"projects.blender.org/studio/flamenco/pkg/shaman"
|
|
"projects.blender.org/studio/flamenco/pkg/sysinfo"
|
|
)
|
|
|
|
var cliArgs struct {
|
|
version bool
|
|
writeConfig bool
|
|
delayResponses bool
|
|
setupAssistant bool
|
|
pprof bool
|
|
}
|
|
|
|
const (
|
|
developmentWebInterfacePort = 8081
|
|
|
|
webappEntryPoint = "index.html"
|
|
|
|
// dbOpenTimeout is the time the persistence layer gets to open the database.
|
|
// This includes database migrations, which can take some time to perform.
|
|
dbOpenTimeout = 1 * time.Minute
|
|
)
|
|
|
|
type shutdownFunc func()
|
|
|
|
func main() {
|
|
output := zerolog.ConsoleWriter{Out: colorable.NewColorableStdout(), TimeFormat: time.RFC3339}
|
|
log.Logger = log.Output(output)
|
|
|
|
osDetail, err := sysinfo.Description()
|
|
if err != nil {
|
|
osDetail = err.Error()
|
|
}
|
|
log.Info().
|
|
Str("version", appinfo.ApplicationVersion).
|
|
Str("git", appinfo.ApplicationGitHash).
|
|
Str("releaseCycle", appinfo.ReleaseCycle).
|
|
Str("os", runtime.GOOS).
|
|
Str("osDetail", osDetail).
|
|
Str("arch", runtime.GOARCH).
|
|
Msgf("starting %v", appinfo.ApplicationName)
|
|
|
|
parseCliArgs()
|
|
if cliArgs.version {
|
|
return
|
|
}
|
|
|
|
startFlamenco := true
|
|
for startFlamenco {
|
|
startFlamenco = runFlamencoManager()
|
|
|
|
// After the first run, the setup assistant should not be forced any more.
|
|
// If the configuration is still incomplete it can still auto-trigger.
|
|
cliArgs.setupAssistant = false
|
|
|
|
if startFlamenco {
|
|
log.Info().
|
|
Str("version", appinfo.ApplicationVersion).
|
|
Str("os", runtime.GOOS).
|
|
Str("arch", runtime.GOARCH).
|
|
Msgf("restarting %v", appinfo.ApplicationName)
|
|
}
|
|
}
|
|
|
|
log.Info().Msg("stopping the Flamenco Manager process")
|
|
}
|
|
|
|
// runFlamencoManager starts the entire Flamenco Manager, and only returns after
|
|
// it has been completely shut down.
|
|
// Returns true if it should be restarted again.
|
|
func runFlamencoManager() bool {
|
|
// Load configuration.
|
|
configService := config.NewService()
|
|
err := configService.Load()
|
|
if err != nil && !errors.Is(err, fs.ErrNotExist) {
|
|
log.Error().Err(err).Msg("loading configuration")
|
|
}
|
|
|
|
if cliArgs.setupAssistant {
|
|
configService.ForceFirstRun()
|
|
}
|
|
isFirstRun, err := configService.IsFirstRun()
|
|
switch {
|
|
case err != nil:
|
|
log.Fatal().Err(err).Msg("unable to determine whether this is the first run of Flamenco or not")
|
|
case isFirstRun:
|
|
log.Info().Msg("This seems to be your first run of Flamenco! A webbrowser will open to help you set things up.")
|
|
}
|
|
|
|
if cliArgs.writeConfig {
|
|
err := configService.Save()
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("could not write configuration file")
|
|
os.Exit(1)
|
|
}
|
|
return false
|
|
}
|
|
|
|
// TODO: enable TLS via Let's Encrypt.
|
|
listen := configService.Get().Listen
|
|
_, port, _ := net.SplitHostPort(listen)
|
|
log.Info().Str("port", port).Msg("listening")
|
|
|
|
// Find our URLs.
|
|
urls, err := own_url.AvailableURLs("http", listen)
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("unable to figure out my own URL")
|
|
}
|
|
|
|
// Construct the UPnP/SSDP server.
|
|
var ssdp *upnp_ssdp.Server
|
|
if configService.Get().SSDPDiscovery {
|
|
ssdp = makeAutoDiscoverable(urls)
|
|
} else {
|
|
log.Debug().Msg("UPnP/SSDP autodiscovery disabled in configuration")
|
|
}
|
|
|
|
// Construct the services.
|
|
persist := openDB(*configService)
|
|
defer persist.Close()
|
|
|
|
timeService := clock.New()
|
|
compiler, err := job_compilers.Load(timeService)
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("error loading job compilers")
|
|
}
|
|
|
|
// Set up the event system.
|
|
eventBroker := eventbus.NewBroker()
|
|
socketio := eventbus.NewSocketIOForwarder()
|
|
eventBroker.AddForwarder(socketio)
|
|
mqttClient := eventbus.NewMQTTForwarder(configService.Get().MQTT.Client)
|
|
if mqttClient != nil {
|
|
eventBroker.AddForwarder(mqttClient)
|
|
}
|
|
|
|
localStorage := local_storage.NewNextToExe(configService.Get().LocalManagerStoragePath)
|
|
logStorage := task_logs.NewStorage(localStorage, timeService, eventBroker)
|
|
|
|
taskStateMachine := task_state_machine.NewStateMachine(persist, eventBroker, logStorage)
|
|
sleepScheduler := sleep_scheduler.New(timeService, persist, eventBroker)
|
|
lastRender := last_rendered.New(localStorage)
|
|
|
|
shamanServer := buildShamanServer(configService, isFirstRun)
|
|
jobDeleter := job_deleter.NewService(persist, localStorage, eventBroker, shamanServer)
|
|
|
|
farmStatus := farmstatus.NewService(persist, eventBroker)
|
|
|
|
flamenco := api_impl.NewFlamenco(
|
|
compiler, persist, eventBroker, logStorage, configService,
|
|
taskStateMachine, shamanServer, timeService, lastRender,
|
|
localStorage, sleepScheduler, jobDeleter, farmStatus)
|
|
|
|
e := buildWebService(flamenco, persist, ssdp, socketio, urls, localStorage)
|
|
|
|
timeoutChecker := timeout_checker.New(
|
|
configService.Get().TaskTimeout,
|
|
configService.Get().WorkerTimeout,
|
|
timeService, persist, taskStateMachine, logStorage, eventBroker)
|
|
|
|
// The main context determines the lifetime of the application. All
|
|
// long-running goroutines need to keep an eye on this, and stop their work
|
|
// once it closes.
|
|
mainCtx, mainCtxCancel := context.WithCancel(context.Background())
|
|
|
|
triggerShutdown := func() {
|
|
// Notify that Flamenco is shutting down.
|
|
event := eventbus.NewLifeCycleEvent(api.LifeCycleEventTypeManagerShutdown)
|
|
eventBroker.BroadcastLifeCycleEvent(event)
|
|
|
|
// Give event bus some time to process the shutdown event.
|
|
time.Sleep(500 * time.Millisecond)
|
|
|
|
// Cancel the main context, triggering an application-wide shutdown.
|
|
mainCtxCancel()
|
|
}
|
|
|
|
installSignalHandler(triggerShutdown)
|
|
|
|
if mqttClient != nil {
|
|
mqttClient.Connect(mainCtx)
|
|
}
|
|
|
|
// Before doing anything new, clean up in case we made a mess in an earlier run.
|
|
taskStateMachine.CheckStuck(mainCtx)
|
|
|
|
// All main goroutines should sync with this waitgroup. Once the waitgroup is
|
|
// done, the main() function will return and the process will stop.
|
|
wg := new(sync.WaitGroup)
|
|
|
|
// Run the "last rendered image" processor.
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
lastRender.Run(mainCtx)
|
|
}()
|
|
|
|
// Run a periodic integrity check on the database.
|
|
// When that check fails, the entire application should shut down.
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
persist.PeriodicIntegrityCheck(mainCtx,
|
|
configService.Get().DBIntegrityCheck,
|
|
mainCtxCancel)
|
|
}()
|
|
|
|
// Start the web server.
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
|
|
// No matter how this function ends, if the HTTP server goes down, so does
|
|
// the application.
|
|
defer mainCtxCancel()
|
|
|
|
err := runWebService(mainCtx, e, listen)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("HTTP server error, shutting down the application")
|
|
}
|
|
}()
|
|
|
|
// Start the UPnP/SSDP server.
|
|
if ssdp != nil {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
ssdp.Run(mainCtx)
|
|
}()
|
|
}
|
|
|
|
// Start the timeout checker.
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
timeoutChecker.Run(mainCtx)
|
|
}()
|
|
|
|
// Run the Worker sleep scheduler.
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
sleepScheduler.Run(mainCtx)
|
|
}()
|
|
|
|
// Run the Job Deleter.
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
jobDeleter.Run(mainCtx)
|
|
}()
|
|
|
|
// Run the Farm Status service.
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
farmStatus.Run(mainCtx)
|
|
}()
|
|
|
|
// Log the URLs last, hopefully that makes them more visible / encouraging to go to for users.
|
|
go func() {
|
|
time.Sleep(100 * time.Millisecond)
|
|
logURLs(urls)
|
|
}()
|
|
|
|
// Open a webbrowser, but give the web service some time to start first.
|
|
if isFirstRun {
|
|
go openWebbrowser(mainCtx, urls[0])
|
|
}
|
|
|
|
// Notify that Flamenco has started.
|
|
{
|
|
event := eventbus.NewLifeCycleEvent(api.LifeCycleEventTypeManagerStartup)
|
|
eventBroker.BroadcastLifeCycleEvent(event)
|
|
}
|
|
|
|
// Allow the Flamenco API itself trigger a shutdown as well.
|
|
log.Debug().Msg("waiting for a shutdown request from Flamenco")
|
|
doRestart := flamenco.WaitForShutdown(mainCtx)
|
|
log.Info().Bool("willRestart", doRestart).Msg("going to shut down the service")
|
|
triggerShutdown()
|
|
|
|
wg.Wait()
|
|
log.Info().Bool("willRestart", doRestart).Msg("Flamenco Manager service shut down")
|
|
|
|
return doRestart
|
|
}
|
|
|
|
func buildShamanServer(configService *config.Service, isFirstRun bool) api_impl.Shaman {
|
|
if isFirstRun {
|
|
log.Info().Msg("Not starting Shaman storage service, as this is the first run of Flamenco. Configure the shared storage location first.")
|
|
return &dummy.DummyShaman{}
|
|
}
|
|
return shaman.NewServer(configService.Get().Shaman, nil)
|
|
}
|
|
|
|
// openWebbrowser starts a web browser after waiting for 1 second.
|
|
// Closing the context aborts the opening of the browser, but doesn't close the
|
|
// browser itself if has already started.
|
|
func openWebbrowser(ctx context.Context, url url.URL) {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(1 * time.Second):
|
|
}
|
|
|
|
urlToTry := url.String()
|
|
if err := browser.OpenURL(urlToTry); err != nil {
|
|
log.Error().Err(err).Msgf("unable to open a browser to %s, please open it yourself or try any of the other URLs above", urlToTry)
|
|
return
|
|
}
|
|
log.Info().Str("url", urlToTry).Msgf("opened browser to the Flamenco interface")
|
|
}
|
|
|
|
func parseCliArgs() {
|
|
var quiet, debug, trace bool
|
|
|
|
flag.BoolVar(&cliArgs.version, "version", false, "Shows the application version, then exits.")
|
|
flag.BoolVar(&quiet, "quiet", false, "Only log warning-level and worse.")
|
|
flag.BoolVar(&debug, "debug", false, "Enable debug-level logging.")
|
|
flag.BoolVar(&trace, "trace", false, "Enable trace-level logging.")
|
|
flag.BoolVar(&cliArgs.writeConfig, "write-config", false, "Writes configuration to flamenco-manager.yaml, then exits.")
|
|
flag.BoolVar(&cliArgs.delayResponses, "delay", false,
|
|
"Add a random delay to any HTTP responses. This aids in development of Flamenco Manager's web frontend.")
|
|
flag.BoolVar(&cliArgs.setupAssistant, "setup-assistant", false, "Open a webbrowser with the setup assistant.")
|
|
flag.BoolVar(&cliArgs.pprof, "pprof", false, "Expose profiler endpoints on /debug/pprof/.")
|
|
|
|
flag.Parse()
|
|
|
|
var logLevel zerolog.Level
|
|
switch {
|
|
case trace:
|
|
logLevel = zerolog.TraceLevel
|
|
case debug:
|
|
logLevel = zerolog.DebugLevel
|
|
case quiet:
|
|
logLevel = zerolog.WarnLevel
|
|
default:
|
|
logLevel = zerolog.InfoLevel
|
|
}
|
|
zerolog.SetGlobalLevel(logLevel)
|
|
}
|
|
|
|
// openDB opens the database or dies.
|
|
func openDB(configService config.Service) *persistence.DB {
|
|
dsn := configService.Get().DatabaseDSN
|
|
if dsn == "" {
|
|
log.Fatal().Msg("configure the database in flamenco-manager.yaml")
|
|
}
|
|
|
|
dbCtx, dbCtxCancel := context.WithTimeout(context.Background(), dbOpenTimeout)
|
|
defer dbCtxCancel()
|
|
persist, err := persistence.OpenDB(dbCtx, dsn)
|
|
if err != nil {
|
|
log.Fatal().
|
|
Err(err).
|
|
Str("dsn", dsn).
|
|
Msg("error opening database")
|
|
}
|
|
|
|
return persist
|
|
}
|
|
|
|
// installSignalHandler spawns a goroutine that handles incoming POSIX signals.
|
|
func installSignalHandler(shutdownFunc shutdownFunc) {
|
|
signals := make(chan os.Signal, 1)
|
|
signal.Notify(signals, os.Interrupt)
|
|
signal.Notify(signals, syscall.SIGTERM)
|
|
go func() {
|
|
for signum := range signals {
|
|
log.Info().Str("signal", signum.String()).Msg("signal received, shutting down")
|
|
shutdownFunc()
|
|
}
|
|
}()
|
|
}
|
|
|
|
func makeAutoDiscoverable(urls []url.URL) *upnp_ssdp.Server {
|
|
ssdp, err := upnp_ssdp.NewServer(log.Logger)
|
|
if err != nil {
|
|
strUrls := make([]string, len(urls))
|
|
for idx := range urls {
|
|
strUrls[idx] = urls[idx].String()
|
|
}
|
|
log.Error().Strs("urls", strUrls).Msg("Unable to create UPnP/SSDP server. " +
|
|
"This means that Workers will not be able to automatically find this Manager. " +
|
|
"Run them with the `-manager URL` argument, picking a URL from this list.")
|
|
return nil
|
|
}
|
|
|
|
ssdp.AddAdvertisementURLs(urls)
|
|
return ssdp
|
|
}
|
|
|
|
func logURLs(urls []url.URL) {
|
|
log.Info().Int("count", len(urls)).Msg("possible URLs at which to reach Flamenco Manager")
|
|
for _, url := range urls {
|
|
// Don't log this with something like `Str("url", url.String())`, because
|
|
// that puts `url=` in front of the URL. This can interfere with
|
|
// link-detection in the terminal. Having a space in front is much better,
|
|
// as that is guaranteed to count as word-delimiter for double-click
|
|
// word-selection.
|
|
log.Info().Msgf("- %s", url.String())
|
|
}
|
|
}
|