flamenco/internal/stresser/stresser.go
Sybren A. Stüvel 02fac6a4df Change Go package name from git.blender.org to projects.blender.org
Change the package base name of the Go code, from
`git.blender.org/flamenco` to `projects.blender.org/studio/flamenco`.

The old location, `git.blender.org`, has no longer been use since the
[migration to Gitea][1]. The new package names now reflect the actual
location where Flamenco is hosted.

[1]: https://code.blender.org/2023/02/new-blender-development-infrastructure/
2023-08-01 12:42:31 +02:00

144 lines
3.0 KiB
Go

package stresser
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/rs/zerolog/log"
"projects.blender.org/studio/flamenco/internal/worker"
"projects.blender.org/studio/flamenco/pkg/api"
)
const (
// For the actual stress test.
durationWaitStress = 0 * time.Millisecond
reportPeriod = 2 * time.Second
)
var (
numRequests = 0
numFailed = 0
startTime time.Time
mutex = sync.RWMutex{}
)
func Run(ctx context.Context, client worker.FlamencoClient) {
// Get a task.
task := fetchTask(ctx, client)
if task == nil {
log.Fatal().Msg("error obtaining task, shutting down stresser")
return
}
logger := log.With().Str("task", task.Uuid).Logger()
logger.Info().
Str("job", task.Job).
Msg("obtained task")
// Mark the task as active.
err := sendTaskUpdate(ctx, client, task.Uuid, api.TaskUpdate{
Activity: ptr("Stress testing"),
TaskStatus: ptr(api.TaskStatusActive),
})
if err != nil {
logger.Warn().Err(err).Msg("Manager rejected task becoming active. Going to stress it anyway.")
}
startTime = time.Now()
go reportStatisticsLoop(ctx)
// Do the stress test.
var wait time.Duration
for {
select {
case <-ctx.Done():
log.Debug().Msg("stresser interrupted by context cancellation")
return
case <-time.After(wait):
}
// stressBySendingTaskUpdate(ctx, client, task)
stressByRequestingTask(ctx, client)
wait = durationWaitStress
}
}
func stressByRequestingTask(ctx context.Context, client worker.FlamencoClient) {
increaseNumRequests()
task := fetchTask(ctx, client)
if task == nil {
increaseNumFailed()
log.Info().Msg("error obtaining task")
}
}
func stressBySendingTaskUpdate(ctx context.Context, client worker.FlamencoClient, task *api.AssignedTask) {
logLine := "This is a log-line for stress testing. It will be repeated more than once.\n"
logToSend := strings.Repeat(logLine, 5)
increaseNumRequests()
mutex.RLock()
update := api.TaskUpdate{
Activity: ptr(fmt.Sprintf("stress test update %v", numRequests)),
Log: &logToSend,
}
mutex.RUnlock()
err := sendTaskUpdate(ctx, client, task.Uuid, update)
if err != nil {
log.Info().Err(err).Str("task", task.Uuid).Msg("Manager rejected task update")
increaseNumFailed()
}
}
func ptr[T any](value T) *T {
return &value
}
func increaseNumRequests() {
mutex.Lock()
defer mutex.Unlock()
numRequests++
}
func increaseNumFailed() {
mutex.Lock()
defer mutex.Unlock()
numFailed++
}
func reportStatistics() {
mutex.RLock()
defer mutex.RUnlock()
duration := time.Since(startTime)
durationInSeconds := float64(duration) / float64(time.Second)
reqPerSecond := float64(numRequests) / durationInSeconds
log.Info().
Int("numRequests", numRequests).
Int("numFailed", numFailed).
Str("duration", duration.String()).
Float64("requestsPerSecond", reqPerSecond).
Msg("stress progress")
}
func reportStatisticsLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-time.After(reportPeriod):
reportStatistics()
}
}
}