128 lines
3.4 KiB
Go
128 lines
3.4 KiB
Go
package stresser
|
|
|
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
|
|
"projects.blender.org/studio/flamenco/internal/worker"
|
|
"projects.blender.org/studio/flamenco/pkg/api"
|
|
)
|
|
|
|
const (
|
|
// For fetching the task to stress test.
|
|
durationNoTask = 1 * time.Second // ... if there is no task now.
|
|
durationFetchFailed = 2 * time.Second // ... if fetching failed somehow.
|
|
)
|
|
|
|
var (
|
|
ErrTaskReassigned = worker.ErrTaskReassigned
|
|
ErrTaskUpdateRejected = errors.New("task update was rejected")
|
|
)
|
|
|
|
func GetFlamencoClient(
|
|
ctx context.Context,
|
|
config worker.WorkerConfigWithCredentials,
|
|
) worker.FlamencoClient {
|
|
startupCtx, startupCtxCancel := context.WithTimeout(ctx, 10*time.Second)
|
|
defer startupCtxCancel()
|
|
|
|
client, startupState := worker.RegisterOrSignOn(startupCtx, config)
|
|
if startupState != api.WorkerStatusAwake {
|
|
log.Fatal().Str("requestedStartupState", string(startupState)).Msg("stresser should always be awake")
|
|
}
|
|
|
|
ackStateChange(ctx, client, startupState)
|
|
|
|
return client
|
|
}
|
|
|
|
func fetchTask(ctx context.Context, client worker.FlamencoClient) *api.AssignedTask {
|
|
// Initially don't wait at all.
|
|
var wait time.Duration
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Debug().Msg("task fetching interrupted by context cancellation")
|
|
return nil
|
|
case <-time.After(wait):
|
|
}
|
|
|
|
log.Debug().Msg("fetching tasks")
|
|
resp, err := client.ScheduleTaskWithResponse(ctx)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("error obtaining task")
|
|
wait = durationFetchFailed
|
|
continue
|
|
}
|
|
switch {
|
|
case resp.JSON200 != nil:
|
|
return resp.JSON200
|
|
case resp.JSON423 != nil:
|
|
log.Fatal().Str("requestedStatus", string(resp.JSON423.StatusRequested)).
|
|
Msg("Manager requests status change, stresser does not support this")
|
|
return nil
|
|
case resp.JSON403 != nil:
|
|
log.Error().
|
|
Int("code", resp.StatusCode()).
|
|
Str("error", string(resp.JSON403.Message)).
|
|
Msg("access denied")
|
|
wait = durationFetchFailed
|
|
case resp.StatusCode() == http.StatusNoContent:
|
|
log.Debug().Msg("no task available")
|
|
wait = durationNoTask
|
|
default:
|
|
log.Warn().
|
|
Int("code", resp.StatusCode()).
|
|
Str("error", string(resp.Body)).
|
|
Msg("unable to obtain task for unknown reason")
|
|
wait = durationFetchFailed
|
|
}
|
|
}
|
|
}
|
|
|
|
func ackStateChange(ctx context.Context, client worker.FlamencoClient, state api.WorkerStatus) {
|
|
req := api.WorkerStateChangedJSONRequestBody{Status: state}
|
|
|
|
logger := log.With().Str("state", string(state)).Logger()
|
|
logger.Debug().Msg("notifying Manager of our state")
|
|
|
|
resp, err := client.WorkerStateChangedWithResponse(ctx, req)
|
|
if err != nil {
|
|
logger.Fatal().Err(err).Msg("unable to notify Manager of status change")
|
|
return
|
|
}
|
|
|
|
// The 'default' response is for error cases.
|
|
if resp.JSONDefault != nil {
|
|
logger.Fatal().
|
|
Str("httpCode", resp.HTTPResponse.Status).
|
|
Interface("error", resp.JSONDefault).
|
|
Msg("error sending status change to Manager")
|
|
return
|
|
}
|
|
}
|
|
|
|
func sendTaskUpdate(ctx context.Context, client worker.FlamencoClient, taskID string, update api.TaskUpdate) error {
|
|
resp, err := client.TaskUpdateWithResponse(ctx, taskID, api.TaskUpdateJSONRequestBody(update))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
switch resp.StatusCode() {
|
|
case http.StatusNoContent:
|
|
return nil
|
|
case http.StatusConflict:
|
|
return worker.ErrTaskReassigned
|
|
default:
|
|
return fmt.Errorf("%w: task=%s", ErrTaskUpdateRejected, taskID)
|
|
}
|
|
}
|