Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions agent/server/server_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"math/rand/v2"
"net"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -319,5 +319,11 @@ func TestGRPCServer_ClientAutoClose(t *testing.T) {
}

func getRandomPort() int {
return 51000 + rand.IntN(1000)
ln, err := net.Listen("tcp", ":0")
if err != nil {
panic(fmt.Sprintf("failed to find free port: %v", err))
}
port := ln.Addr().(*net.TCPAddr).Port
ln.Close()
return port
}
209 changes: 140 additions & 69 deletions agent/server/snykbroker/relay_instance_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ import (
const defaultSnykBroker = "snyk-broker"
const brokerPort = 7343

// restartRequest is sent to the restart channel by any code path that
// needs to restart the broker. The generation field ties the request
// to the broker lifecycle that triggered it, so the consumer can
// discard stale requests that arrive after a restart has already
// happened.
type restartRequest struct {
reason string
generation int32
}

type RelayInstanceManager interface {
Start() error
Restart() error
Expand All @@ -38,14 +48,15 @@ type relayInstanceManager struct {
config config.AgentConfig
logger *zap.Logger
supervisor *Supervisor
running atomic.Bool
startCount atomic.Int32
lastStartTime atomic.Int64
tokenInfo *tokenInfo
running atomic.Bool
startCount atomic.Int32
generation atomic.Int32 // incremented on each Start(), used to deduplicate restart requests
tokenInfo *tokenInfo
operationsCounter *prometheus.CounterVec
transport *http.Transport

reflector *RegistrationReflector
restartCh chan restartRequest
}

type tokenInfo struct {
Expand Down Expand Up @@ -97,6 +108,7 @@ func NewRelayInstanceManager(
[]string{"integration", "alias", "operation", "status"},
),
transport: p.Transport,
restartCh: make(chan restartRequest, 1),
}

p.HttpServer.RegisterHandler(mgr)
Expand All @@ -106,6 +118,7 @@ func NewRelayInstanceManager(
}

mgr.reflector = p.Reflector
go mgr.restartConsumer()

if p.Lifecycle != nil {
p.Lifecycle.Append(fx.Hook{
Expand Down Expand Up @@ -212,6 +225,106 @@ func (r *relayInstanceManager) handleSystemCheck(w http.ResponseWriter, req *htt

}

// requestRestart sends a restart request for the given generation.
// If the channel is full (a restart is already pending) the request
// is dropped — the pending restart will cover it.
func (r *relayInstanceManager) requestRestart(reason string, gen int32) {
select {
case r.restartCh <- restartRequest{reason: reason, generation: gen}:
r.logger.Info("Restart requested", zap.String("reason", reason), zap.Int32("generation", gen))
default:
r.logger.Debug("Restart already pending, dropping duplicate",
zap.String("reason", reason), zap.Int32("generation", gen))
}
}

// restartConsumer is the single goroutine that processes restart
// requests. It deduplicates by generation: if the broker has already
// moved to a newer generation by the time we read the request, the
// request is stale and we skip it.
//
// It also doubles as the idle watchdog: every minute it checks
// shouldRestart() and produces a restart request if the broker has
// been idle too long.
func (r *relayInstanceManager) restartConsumer() {
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()

for {
var req restartRequest
select {
case req = <-r.restartCh:
// explicit restart request
case <-ticker.C:
if ok, reason := r.shouldRestart(); ok {
req = restartRequest{reason: reason, generation: r.generation.Load()}
} else {
continue
}
}

current := r.generation.Load()
if req.generation != current {
r.logger.Info("Ignoring stale restart request",
zap.String("reason", req.reason),
zap.Int32("requestGeneration", req.generation),
zap.Int32("currentGeneration", current))
continue
}
r.logger.Info("Processing restart request",
zap.String("reason", req.reason),
zap.Int32("generation", req.generation))
r.emitOperationCounter("restart_"+req.reason, true)

// Retry with backoff until the restart succeeds or we're
// shut down. This is the persistent outer watchdog: no
// matter how many times the broker or registration fails,
// we keep trying.
for attempt := 0; ; attempt++ {
if err := r.Restart(); err != nil {
delay := r.restartBackoff(attempt)
r.logger.Error("Restart failed, will retry",
zap.String("reason", req.reason),
zap.Int("attempt", attempt+1),
zap.Duration("backoff", delay),
zap.Error(err))
time.Sleep(delay)
// If we were shut down while sleeping, stop.
if !r.running.Load() {
return
}
continue
}
break
}
}
}

// shouldRestart checks whether the broker should be restarted due to
// idle timeout. Returns true and the reason string if a restart is needed.
func (r *relayInstanceManager) shouldRestart() (bool, string) {
if r.config.RelayIdleTimeout == 0 || r.reflector == nil {
return false, ""
}
if !r.config.HttpRelayReflectorMode.ReflectsTraffic() {
return false, ""
}
if time.Since(r.reflector.LastTrafficTime()) >= r.config.RelayIdleTimeout {
return true, "idle_timeout"
}
return false, ""
}

// restartBackoff returns an exponential backoff duration capped at 60 seconds.
func (r *relayInstanceManager) restartBackoff(attempt int) time.Duration {
base := 5 * time.Second
d := base << uint(attempt) // 5s, 10s, 20s, 40s, 60s ...
if d > 60*time.Second {
d = 60 * time.Second
}
return d
}

var errSkipBroker = errors.New("NoBrokerToken")

func (r *relayInstanceManager) Restart() error {
Expand Down Expand Up @@ -300,6 +413,8 @@ func (r *relayInstanceManager) Start() error {
return fmt.Errorf("already started")
}

r.generation.Add(1)

executable := defaultSnykBroker

if directPath := os.Getenv("SNYK_BROKER_PATH"); directPath != "" {
Expand Down Expand Up @@ -354,98 +469,51 @@ func (r *relayInstanceManager) Start() error {
panic(err)
}

gen := r.generation.Load()
done := make(chan struct{})

// Auto-register: poll for token changes and request restart if changed.
if r.config.AutoRegisterFrequency != 0 {
go func() {
for {
if !r.running.Load() {
return
}
select {
case <-time.After(r.config.AutoRegisterFrequency):

info, err := r.getUrlAndToken()
if err != nil {
r.logger.Error("Unable to auto register", zap.Error(err))
continue
}
if info.HasChanged {
r.logger.Info("Auto registered broker, token has changed, restarting")
err = r.Restart()
if err != nil {
r.logger.Error("Unable to auto register restart", zap.Error(err))
continue
}
r.requestRestart("token_changed", gen)
}

case <-done:
return
}
}
}()
}

// When the WebSocket tunnel through the reflector dies (infrastructure
// timeout, network error, etc), restart the broker immediately so it can
// re-establish a fresh connection. A cooldown prevents restart loops during
// startup or rapid successive failures.
// WebSocket tunnel death: request restart when the primus tunnel closes.
if r.reflector != nil && r.config.HttpRelayReflectorMode.ReflectsRegistration() {
const tunnelDeathCooldown = 30 * time.Second
r.reflector.SetOnWSTunnelClose(func() {
if !r.running.Load() {
return
}
sinceLastStart := time.Since(time.UnixMilli(r.lastStartTime.Load()))
if sinceLastStart < tunnelDeathCooldown {
r.logger.Info("WebSocket tunnel closed but within cooldown, skipping restart",
zap.Duration("sinceLastStart", sinceLastStart),
zap.Duration("cooldown", tunnelDeathCooldown),
)
return
}
r.logger.Warn("WebSocket tunnel died, restarting broker")
r.emitOperationCounter("ws_tunnel_restart", true)
if err := r.Restart(); err != nil {
r.logger.Error("Unable to restart broker after WebSocket tunnel death", zap.Error(err))
}
r.requestRestart("ws_tunnel_death", gen)
})
}

if r.config.RelayIdleTimeout != 0 && r.reflector != nil && r.config.HttpRelayReflectorMode.ReflectsTraffic() {
go func() {
for {
if !r.running.Load() {
return
}
select {
case <-time.After(r.config.RelayIdleTimeout):
if !r.running.Load() {
return
}
idle := time.Since(r.reflector.LastTrafficTime())
if idle >= r.config.RelayIdleTimeout {
r.logger.Info("No relay traffic detected, restarting broker",
zap.Duration("idle", idle),
zap.Duration("timeout", r.config.RelayIdleTimeout),
)
err := r.Restart()
r.emitOperationCounter("idle_restart", err == nil)
if err != nil {
r.logger.Error("Unable to restart broker on idle timeout", zap.Error(err))
}
}
case <-done:
return
}
}
}()
}

go func() {

defer close(done)

// On any non-clean exit, request a restart so the watchdog
// picks it up. Only skip for clean shutdown (!running) or
// intentional skip (no token / dry run).
requestRestartOnExit := false
defer func() {
if requestRestartOnExit && r.running.Load() {
r.requestRestart("broker_exit", gen)
}
}()

var info *tokenInfo
var errx error

Expand Down Expand Up @@ -477,9 +545,8 @@ func (r *relayInstanceManager) Start() error {
}

if errx != nil {
// In this case we will fail out of start which will shut down
// initialization and exit.
err = errx
requestRestartOnExit = true
return
}

Expand Down Expand Up @@ -534,8 +601,12 @@ func (r *relayInstanceManager) Start() error {
brokerEnv,
r.config.FailWaitTime,
)
// Only panic on max retries during the very first start.
// On subsequent restarts, return the error so the recovery
// loop can retry.
r.supervisor.panicOnMaxRetries = r.startCount.Load() == 0
r.startCount.Add(1)
r.lastStartTime.Store(time.Now().UnixMilli())
requestRestartOnExit = true
supervisor := r.supervisor
err = supervisor.Start(5, 10*time.Second)
r.emitOperationCounter("broker_start", err == nil)
Expand Down
38 changes: 38 additions & 0 deletions agent/server/snykbroker/relay_instance_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,44 @@ func createTestRelayInstanceManager(t *testing.T, controller *gomock.Controller,
return mgr
}

func TestBrokerSelfHealingRestart(t *testing.T) {
controller := gomock.NewController(t)
defer controller.Finish()

mgr := createTestRelayInstanceManager(t, controller, nil, false, defaultIntegrationInfo)
instance := mgr.Instance()

// Wait for the broker to be running
require.Eventually(t, func() bool {
return instance.supervisor != nil && instance.supervisor.Pid() > 0
}, 5*time.Second, 10*time.Millisecond, "Broker should be running")

initialStartCount := instance.startCount.Load()
initialGeneration := instance.generation.Load()

// Kill the broker process to simulate unexpected death.
// The supervisor will exhaust retries and the restart consumer
// will process the broker_exit restart request.
pid := instance.supervisor.Pid()
if pid > 0 {
process, err := os.FindProcess(pid)
require.NoError(t, err)
process.Kill()
}

// Verify the restart count and generation both increase,
// indicating the restart consumer processed the request.
require.Eventually(t, func() bool {
return instance.startCount.Load() > initialStartCount
}, 30*time.Second, 100*time.Millisecond, "Broker should have been restarted after unexpected exit")

require.Greater(t, instance.generation.Load(), initialGeneration,
"Generation should have incremented after restart")

err := mgr.Close()
require.NoError(t, err)
}

func TestIdleTimeoutDetectsIdleReflector(t *testing.T) {
controller := gomock.NewController(t)
defer controller.Finish()
Expand Down
Loading
Loading