diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index cd37e5a7c3..0debb0967b 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -13,7 +13,10 @@ steps: - name: ":go::robot_face: Lint" key: check-code-committed command: .buildkite/steps/check-code-committed.sh - if_changed: "{go.mod,go.sum,**.go,.buildkite/steps/check-code-committed.sh}" + if_changed: + - go.{mod,sum} + - "**.go" + - .buildkite/steps/check-code-committed.sh plugins: - docker-compose#v4.14.0: config: .buildkite/docker-compose.yml @@ -21,8 +24,25 @@ steps: mount-buildkite-agent: true run: lint + - name: ":go::robot_face: Check protobuf generation" + key: check-protobuf-genreation + command: .buildkite/steps/check-protobuf-generation.sh + if_changed: + - api/proto/** + - .buildkite/steps/check-protobuf-generation.sh + plugins: + - docker-compose#v4.14.0: + config: .buildkite/docker-compose.yml + cli-version: 2 + mount-buildkite-agent: true + run: lint + - group: ":go::scientist: Tests and Coverage" - if_changed: "{go.mod,go.sum,**.go,**/fixtures/**,.buildkite/steps/{tests,test-coverage-report}.sh}" + if_changed: + - go.{mod,sum} + - "**.go" + - "**/fixtures/**" + - .buildkite/steps/{tests,test-coverage-report}.sh steps: - name: ":linux: Linux AMD64 Tests" key: test-linux-amd64 @@ -30,7 +50,7 @@ steps: parallelism: 2 artifact_paths: - junit-*.xml - - "coverage/**/*" + - "coverage-*/**" plugins: - docker-compose#v4.14.0: config: .buildkite/docker-compose.yml @@ -51,7 +71,7 @@ steps: parallelism: 2 artifact_paths: - junit-*.xml - - "coverage/**/*" + - "coverage-*/**" agents: queue: $AGENT_RUNNERS_LINUX_ARM64_QUEUE plugins: @@ -74,7 +94,7 @@ steps: parallelism: 2 artifact_paths: - junit-*.xml - - "coverage/**/*" + - "coverage-*/**" agents: queue: $AGENT_RUNNERS_WINDOWS_QUEUE plugins: @@ -93,7 +113,7 @@ steps: parallelism: 3 artifact_paths: - junit-*.xml - - "coverage/**/*" + - "coverage-*/**" agents: queue: $AGENT_RUNNERS_LINUX_ARM64_QUEUE plugins: @@ -112,7 +132,7 @@ steps: - name: ":coverage: Test coverage report Linux ARM64" key: test-coverage-linux-arm64 - command: ".buildkite/steps/test-coverage-report.sh" + command: ".buildkite/steps/test-coverage-report.sh coverage-linux-arm64" artifact_paths: - "cover.html" - "cover.out" @@ -124,12 +144,11 @@ steps: cli-version: 2 run: agent - artifacts#v1.9.4: - download: "coverage/**" - step: test-linux-arm64 + download: "coverage-linux-arm64/**" - name: ":coverage: Test coverage report Linux AMD64" key: test-coverage-linux-amd64 - command: ".buildkite/steps/test-coverage-report.sh" + command: ".buildkite/steps/test-coverage-report.sh coverage-linux-amd64" artifact_paths: - "cover.html" - "cover.out" @@ -141,12 +160,11 @@ steps: cli-version: 2 run: agent - artifacts#v1.9.4: - download: "coverage/**" - step: test-linux-amd64 + download: "coverage-linux-amd64/**" - name: ":coverage: Test coverage report Linux ARM64 Race" key: test-coverage-linux-arm64-race - command: ".buildkite/steps/test-coverage-report.sh" + command: ".buildkite/steps/test-coverage-report.sh coverage-linux-arm64-race" artifact_paths: - "cover.html" - "cover.out" @@ -158,8 +176,7 @@ steps: cli-version: 2 run: agent - artifacts#v1.9.4: - download: "coverage/**" - step: test-race-linux-arm64 + download: "coverage-linux-arm64-race/**" - label: ":writing_hand: Annotate with Test Failures" depends_on: diff --git a/.buildkite/steps/check-protobuf-generation.sh b/.buildkite/steps/check-protobuf-generation.sh new file mode 100755 index 0000000000..cbbad46245 --- /dev/null +++ b/.buildkite/steps/check-protobuf-generation.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env sh + +set -euf + +cd api/proto + +echo --- :buf: Installing buf... +go install github.com/bufbuild/buf/cmd/buf@v1.61.0 +go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.36.10 +go install connectrpc.com/connect/cmd/protoc-gen-connect-go@v1.19.1 + +echo --- :connectrpc: Checking protobuf file generation... +buf generate +if ! git diff --no-ext-diff --exit-code; then + echo ^^^ +++ + echo "Generated protobuf files are out of sync with the source code" + echo "Please run \`buf generate\` in the internal/proto directory locally, and commit the result." + exit 1 +fi diff --git a/.buildkite/steps/test-coverage-report.sh b/.buildkite/steps/test-coverage-report.sh index 28991f44e4..5294b7af79 100755 --- a/.buildkite/steps/test-coverage-report.sh +++ b/.buildkite/steps/test-coverage-report.sh @@ -2,5 +2,5 @@ set -euo pipefail echo 'Producing coverage report' -go tool covdata textfmt -i "coverage" -o cover.out +go tool covdata textfmt -i "$1" -o cover.out go tool cover -html cover.out -o cover.html diff --git a/.buildkite/steps/tests.sh b/.buildkite/steps/tests.sh index 9ee0706bbd..eb949f116a 100755 --- a/.buildkite/steps/tests.sh +++ b/.buildkite/steps/tests.sh @@ -4,6 +4,11 @@ set -euo pipefail go version echo arch is "$(uname -m)" +RACE='' +if [[ $* == *-race* ]] ; then + RACE='-race' +fi + export BUILDKITE_TEST_ENGINE_SUITE_SLUG=buildkite-agent export BUILDKITE_TEST_ENGINE_TEST_RUNNER=gotest export BUILDKITE_TEST_ENGINE_RESULT_PATH="junit-${BUILDKITE_JOB_ID}.xml" @@ -13,8 +18,8 @@ if [[ "$(go env GOOS)" == "windows" ]]; then # need a Windows VM to debug. export BUILDKITE_TEST_ENGINE_TEST_CMD="go tool gotestsum --junitfile={{resultPath}} -- -count=1 $* {{packages}}" else - mkdir -p coverage - COVERAGE_DIR="$PWD/coverage" + COVERAGE_DIR="${PWD}/coverage-$(go env GOOS)-$(go env GOARCH)${RACE}" + mkdir -p "${COVERAGE_DIR}" export BUILDKITE_TEST_ENGINE_TEST_CMD="go tool gotestsum --junitfile={{resultPath}} -- -count=1 -cover $* {{packages}} -test.gocoverdir=${COVERAGE_DIR}" fi diff --git a/agent/agent_configuration.go b/agent/agent_configuration.go index 3929b743b9..664c373753 100644 --- a/agent/agent_configuration.go +++ b/agent/agent_configuration.go @@ -69,4 +69,6 @@ type AgentConfiguration struct { TraceContextEncoding string DisableWarningsFor []string AllowMultipartArtifactUpload bool + + PingMode string } diff --git a/agent/agent_pool.go b/agent/agent_pool.go index 064b344a88..cb0f75393e 100644 --- a/agent/agent_pool.go +++ b/agent/agent_pool.go @@ -8,6 +8,7 @@ import ( "net/http" "strconv" "sync" + "time" "github.com/buildkite/agent/v3/logger" "github.com/buildkite/agent/v3/status" @@ -17,13 +18,15 @@ import ( // AgentPool manages multiple parallel AgentWorkers. type AgentPool struct { - workers []*AgentWorker + workers []*AgentWorker + idleTimeout time.Duration } // NewAgentPool returns a new AgentPool. -func NewAgentPool(workers []*AgentWorker) *AgentPool { +func NewAgentPool(workers []*AgentWorker, config *AgentConfiguration) *AgentPool { return &AgentPool{ - workers: workers, + workers: workers, + idleTimeout: config.DisconnectAfterIdleTimeout, } } @@ -58,7 +61,7 @@ func (r *AgentPool) Start(ctx context.Context) error { defer done() setStat("🏃 Spawning workers...") - idleMon := newIdleMonitor(len(r.workers)) + idleMon := NewIdleMonitor(ctx, len(r.workers), r.idleTimeout) errCh := make(chan error) @@ -82,7 +85,7 @@ func (r *AgentPool) Start(ctx context.Context) error { func runWorker(ctx context.Context, worker *AgentWorker, idleMon *idleMonitor) error { agentWorkersStarted.Inc() defer agentWorkersEnded.Inc() - defer idleMon.markDead(worker) + defer idleMon.MarkDead(worker) // Connect the worker to the API if err := worker.Connect(ctx); err != nil { diff --git a/agent/agent_worker.go b/agent/agent_worker.go index d988792c71..c218af65c2 100644 --- a/agent/agent_worker.go +++ b/agent/agent_worker.go @@ -5,15 +5,14 @@ import ( "errors" "fmt" "io" - "math/rand/v2" "net/http" "sync" "sync/atomic" "time" + "connectrpc.com/connect" "github.com/buildkite/agent/v3/api" "github.com/buildkite/agent/v3/core" - "github.com/buildkite/agent/v3/internal/ptr" "github.com/buildkite/agent/v3/logger" "github.com/buildkite/agent/v3/metrics" "github.com/buildkite/agent/v3/process" @@ -163,9 +162,22 @@ func (e *errUnrecoverable) Error() string { return fmt.Sprintf("%s failed with unrecoverable status: %s, mesage: %q", e.action, status, e.err) } +// See https://connectrpc.com/docs/protocol/#http-to-error-code +var codeUnrecoverable = map[connect.Code]bool{ + connect.CodeInternal: true, // 400 + connect.CodeUnauthenticated: true, // 401 + connect.CodePermissionDenied: true, // 403 + connect.CodeUnimplemented: true, // 404 + // All other codes are implicitly false, but particularly: + // Unavailable (429, 502, 503, 504) and Unknown (all other HTTP statuses). +} + func isUnrecoverable(err error) bool { var u *errUnrecoverable - return errors.As(err, &u) + if errors.As(err, &u) { + return true + } + return codeUnrecoverable[connect.CodeOf(err)] } func (e *errUnrecoverable) Unwrap() error { @@ -216,7 +228,7 @@ func (a *AgentWorker) statusCallback(context.Context) (any, error) { }, nil } -// Starts the agent worker +// Start starts the agent worker. func (a *AgentWorker) Start(ctx context.Context, idleMon *idleMonitor) (startErr error) { // Record the start time for max agent lifetime tracking a.startTime = time.Now() @@ -233,14 +245,14 @@ func (a *AgentWorker) Start(ctx context.Context, idleMon *idleMonitor) (startErr } defer a.metricsCollector.Stop() //nolint:errcheck // Best-effort cleanup - // errCh receives 1 value from the heartbeat loop, and 1 from the ping loop. - errCh := make(chan error, 2) + // There are as many as 4 different loops that send 1 error here each. + errCh := make(chan error, 4) // Use this context to control the heartbeat loop. heartbeatCtx, stopHeartbeats := context.WithCancel(ctx) defer stopHeartbeats() - // Start the heartbeat loop but don't wait for it to return. + // Start the heartbeat loop but don't wait for it to return (yet). go func() { errCh <- a.runHeartbeatLoop(heartbeatCtx) }() @@ -250,12 +262,6 @@ func (a *AgentWorker) Start(ctx context.Context, idleMon *idleMonitor) (startErr // (Why run a ping loop at all? To find out if the agent is paused, which // affects whether it terminates after the job.) if a.agentConfiguration.AcquireJob != "" { - // When in acquisition mode, there can't be any agents, so - // there's really no point in letting the idle monitor know - // we're busy, but it's probably a good thing to do for good - // measure. - idleMon.markBusy(a) - if err := a.AcquireAndRunJob(ctx, a.agentConfiguration.AcquireJob); err != nil { // If the job acquisition was rejected, we can exit with an error // so that supervisor knows that the job was not acquired due to the job being rejected. @@ -276,252 +282,86 @@ func (a *AgentWorker) Start(ctx context.Context, idleMon *idleMonitor) (startErr } } - // Start the ping loop and block until it has stopped. - errCh <- a.runPingLoop(ctx, idleMon) - - // The ping loop has ended, so stop the heartbeat loop. - stopHeartbeats() + // The baton avoids the ping loop pinging Buildkite when the streaming loop + // is healthy, but allows the ping loop to take over from the streaming loop + // quickly when it becomes unhealthy. + bat := newBaton() - // Block until both loops have returned, then join the errors. - // (Note that errors.Join does the right thing with nil.) - // Both loops are context aware, so no need to wait on ctx here. - return errors.Join(<-errCh, <-errCh) -} + // More channels to enable communication between the various loops. + fromPingLoopCh := make(chan actionMessage) // ping loop to action handler + fromStreamingLoopCh := make(chan actionMessage) // streaming loop to debouncer + fromDebouncerCh := make(chan actionMessage) // debouncer to action handler -func (a *AgentWorker) runHeartbeatLoop(ctx context.Context) error { - ctx, setStat, _ := status.AddSimpleItem(ctx, "Heartbeat loop") - defer setStat("💔 Heartbeat loop stopped!") - setStat("🏃 Starting...") - - heartbeatInterval := time.Second * time.Duration(a.agent.HeartbeatInterval) - heartbeatTicker := time.NewTicker(heartbeatInterval) - defer heartbeatTicker.Stop() - for { - setStat("😴 Sleeping for a bit") - select { - case <-heartbeatTicker.C: - setStat("❤️ Sending heartbeat") - if err := a.Heartbeat(ctx); err != nil { - if isUnrecoverable(err) { - a.logger.Error("%s", err) - // unrecoverable heartbeat failure also stops ping loop - a.StopUngracefully() - return err - } - - // Get the last heartbeat time to the nearest microsecond - a.stats.Lock() - if a.stats.lastHeartbeat.IsZero() { - a.logger.Error("Failed to heartbeat %s. Will try again in %v. (No heartbeat yet)", - err, heartbeatInterval) - } else { - a.logger.Error("Failed to heartbeat %s. Will try again in %v. (Last successful was %v ago)", - err, heartbeatInterval, time.Since(a.stats.lastHeartbeat)) - } - a.stats.Unlock() - } + // Start the loops and block until they have all stopped. + // Based on configuration, we have our choice of ping loop, + // streaming loop+debouncer loop, or both. + var wg sync.WaitGroup - case <-ctx.Done(): - a.logger.Debug("Stopping heartbeats due to context cancel") - // An alternative to returning nil would be ctx.Err(), but we use - // the context for ordinary termination of this loop. - // A context cancellation from outside the agent worker would still - // be reflected in the value returned by the ping loop return. - return nil + pingLoop := func() { + defer wg.Done() + errCh <- a.runPingLoop(ctx, bat, fromPingLoopCh) + } + streamingLoop := func() { + defer wg.Done() + err := a.runStreamingPingLoop(ctx, fromStreamingLoopCh) + if a.agentConfiguration.PingMode != "streaming-only" { + // If the ping mode is streaming-only, then an unrecoverable failure + // in the streaming loop should be reported. + // Otherwise, it should fall back to the ping loop and carry on, + // and if that also has an unrecoverable failure we can report that. + // Said another way: + // Streaming is best-effort but preferred, unless in streaming-only + // mode (it's the only available option). + err = nil } + errCh <- err } -} - -// runPingLoop runs the loop that pings Buildkite for work. It does all critical -// agent things. -// The lifetime of an agent is the lifetime of the ping loop. -func (a *AgentWorker) runPingLoop(ctx context.Context, idleMon *idleMonitor) error { - ctx, setStat, _ := status.AddSimpleItem(ctx, "Ping loop") - defer setStat("🛑 Ping loop stopped!") - setStat("🏃 Starting...") - - // Create the ticker - pingInterval := time.Second * time.Duration(a.agent.PingInterval) - pingTicker := time.NewTicker(pingInterval) - defer pingTicker.Stop() - - // testTriggerCh will normally block forever, and so will not affect the for/select loop. - var testTriggerCh chan struct{} - if a.noWaitBetweenPingsForTesting { - // a closed channel will unblock the for/select instantly, for zero-delay ping loop testing. - testTriggerCh = make(chan struct{}) - close(testTriggerCh) + debouncerLoop := func() { + defer wg.Done() + errCh <- a.runDebouncer(ctx, bat, fromDebouncerCh, fromStreamingLoopCh) } - // On the first iteration, skip waiting for the pingTicker. - // This doesn't skip the jitter, though. - skipTicker := make(chan struct{}, 1) - skipTicker <- struct{}{} - - a.logger.Info("Waiting for instructions...") - - ranJob := false - wasPaused := false - - // Continue this loop until one of: - // * the context is cancelled - // * the stop channel is closed (a.Stop) - // * the agent is in acquire mode and the ping action isn't "pause" - // * the agent is in disconnect-after-job mode, the job is finished, and the - // ping action isn't "pause", - // * the agent is in disconnect-after-idle-timeout mode, has been idle for - // longer than the idle timeout, and the ping action isn't "pause". - // * the agent has exceeded its disconnect-after-uptime and the ping action isn't "pause". - for { - startWait := time.Now() - setStat("😴 Waiting until next ping interval tick") - select { - case <-testTriggerCh: - // instant receive from closed chan when noWaitBetweenPingsForTesting is true - case <-skipTicker: - // continue below - case <-pingTicker.C: - // continue below - case <-a.stop: - a.logger.Debug("Stopping pings due to agent stop") - return nil - case <-ctx.Done(): - a.logger.Debug("Stopping pings due to context cancel") - return ctx.Err() - } - - // Within the interval, wait a random amount of time to avoid - // spontaneous synchronisation across agents. - jitter := rand.N(pingInterval) - setStat(fmt.Sprintf("🫨 Jittering for %v", jitter)) - select { - case <-testTriggerCh: - // instant receive from closed chan when noWaitBetweenPingsForTesting is true - case <-time.After(jitter): - // continue below - case <-a.stop: - a.logger.Debug("Stopping pings due to agent stop") - return nil - case <-ctx.Done(): - a.logger.Debug("Stopping pings due to context cancel") - return ctx.Err() - } - pingWaitDurations.Observe(time.Since(startWait).Seconds()) - - setStat("📡 Pinging Buildkite for instructions") - pingsSent.Inc() - startPing := time.Now() - job, action, err := a.Ping(ctx) - if err != nil { - pingErrors.Inc() - if isUnrecoverable(err) { - a.logger.Error("%v", err) - return err - } - a.logger.Warn("%v", err) - } - pingDurations.Observe(time.Since(startPing).Seconds()) - - pingActions.WithLabelValues(action).Inc() - switch action { - case "disconnect": - a.StopUngracefully() - return nil - - case "pause": - // An agent is not dispatched any jobs while it is paused, but the - // paused agent is expected to remain alive and pinging for - // instructions. - // *This includes acquire-job and disconnect-after-idle-timeout.* - wasPaused = true - continue - } - - // At this point, action was neither "disconnect" nor "pause". - if wasPaused { - a.logger.Info("Agent has resumed after being paused") - wasPaused = false - } - - // Exit after acquire-job. - // For acquire-job agents, registration sets ignore-in-dispatches=true, - // so job should be nil. If not nil, complain. - if a.agentConfiguration.AcquireJob != "" { - if job != nil { - a.logger.Error("Agent ping dispatched a job (id %q) but agent is in acquire-job mode!", job.ID) - } - return nil - } - - // Exit after disconnect-after-job. Finishing the job sets - // ignore-in-dispatches=true, so job should be nil. If not, complain. - if ranJob && a.agentConfiguration.DisconnectAfterJob { - if job != nil { - a.logger.Error("Agent ping dispatched a job (id %q) but agent is in disconnect-after-job mode (and already ran a job)!", job.ID) - } - a.logger.Info("Job ran, and disconnect-after-job is enabled. Disconnecting...") - return nil - } - - // Exit after disconnect-after-uptime is exceeded. - if maxUptime := a.agentConfiguration.DisconnectAfterUptime; maxUptime > 0 { - if time.Since(a.startTime) >= maxUptime { - if job != nil { - a.logger.Error("Agent ping dispatched a job (id %q) but agent has exceeded max uptime of %v!", job.ID, maxUptime) - } - a.logger.Info("Agent has exceeded max uptime of %v. Disconnecting...", maxUptime) - return nil - } - } - - // Note that Ping only returns a job if err == nil. - if job == nil { - idleTimeout := a.agentConfiguration.DisconnectAfterIdleTimeout - if idleTimeout == 0 { - // No job and no idle timeout. - continue - } + var loops []func() + switch a.agentConfiguration.PingMode { + case "", "auto": + loops = []func(){pingLoop, streamingLoop, debouncerLoop} + bat.Acquire(actorDebouncer) - // This ensures agents that never receive a job are still tracked - // by the idle monitor and can properly trigger disconnect-after-idle-timeout. - idleMon.markIdle(a) + case "ping-only": + loops = []func(){pingLoop} + fromDebouncerCh = nil // prevent action loop listening to streaming side - // Exit if every agent has been idle for at least the timeout. - if idleMon.shouldExit(idleTimeout) { - a.logger.Info("All agents have been idle for at least %v. Disconnecting...", idleTimeout) - return nil - } + case "stream-only": + loops = []func(){streamingLoop, debouncerLoop} + fromPingLoopCh = nil // prevent action loop listening to ping side + bat.Acquire(actorDebouncer) + } - // Not idle enough to exit. Wait and ping again. - continue - } + // There's always an action handler. + actionLoop := func() { + defer wg.Done() + errCh <- a.runActionLoop(ctx, idleMon, fromPingLoopCh, fromDebouncerCh) + } + loops = append(loops, actionLoop) - setStat("💼 Accepting job") + // Go loops! + wg.Add(len(loops)) + for _, l := range loops { + go l() + } + wg.Wait() - // Runs the job, only errors if something goes wrong - if err := a.AcceptAndRunJob(ctx, job, idleMon); err != nil { - a.logger.Error("%v", err) - setStat(fmt.Sprintf("✅ Finished job with error: %v", err)) - continue - } + // The source loops have ended, so stop the heartbeat loop. + stopHeartbeats() - ranJob = true - - // Observation: jobs are rarely the last within a pipeline, - // thus if this worker just completed a job, - // there is likely another immediately available. - // Skip waiting for the ping interval until - // a ping without a job has occurred, - // but in exchange, ensure the next ping must wait at least a full - // pingInterval to avoid too much server load. - pingTicker.Reset(pingInterval) - select { - case skipTicker <- struct{}{}: - // Ticker will be skipped - default: - // We're already skipping the ticker, don't block. - } + // Block until all loops have returned, then join the errors. + // (Note that errors.Join does the right thing with nil.) + // All loops are context aware, so no need to wait on ctx here. + var err error + for range len(loops) + 1 { // loops + heartbeat loop + err = errors.Join(err, <-errCh) } + return err } func (a *AgentWorker) internalStop() { @@ -619,76 +459,6 @@ func (a *AgentWorker) Heartbeat(ctx context.Context) error { return nil } -// Performs a ping that checks Buildkite for a job or action to take -// Returns a job, or nil if none is found -func (a *AgentWorker) Ping(ctx context.Context) (job *api.Job, action string, err error) { - ping, resp, pingErr := a.apiClient.Ping(ctx) - // wait a minute, where's my if err != nil block? TL;DR look for pingErr ~20 lines down - // the api client returns an error if the response code isn't a 2xx, but there's still information in resp and ping - // that we need to check out to do special handling for specific error codes or messages in the response body - // once we've done that, we can do the error handling for pingErr - - if ping != nil { - // Is there a message that should be shown in the logs? - if ping.Message != "" { - a.logger.Info(ping.Message) - } - - action = ping.Action - } - - if pingErr != nil { - // If the ping has a non-retryable status, we have to kill the agent, there's no way of recovering - // The reason we do this after the disconnect check is because the backend can (and does) send disconnect actions in - // responses with non-retryable statuses - if resp != nil && !api.IsRetryableStatus(resp) { - return nil, action, &errUnrecoverable{action: "Ping", response: resp, err: pingErr} - } - - // Get the last ping time to the nearest microsecond - a.stats.Lock() - defer a.stats.Unlock() - - // If a ping fails, we don't really care, because it'll - // ping again after the interval. - if a.stats.lastPing.IsZero() { - return nil, action, fmt.Errorf("Failed to ping: %w (No successful ping yet)", pingErr) - } else { - return nil, action, fmt.Errorf("Failed to ping: %w (Last successful was %v ago)", pingErr, time.Since(a.stats.lastPing)) - } - } - - // Track a timestamp for the successful ping for better errors - a.stats.Lock() - a.stats.lastPing = time.Now() - a.stats.Unlock() - - // Should we switch endpoints? - if ping.Endpoint != "" && ping.Endpoint != a.agent.Endpoint { - newAPIClient := a.apiClient.FromPing(ping) - - // Before switching to the new one, do a ping test to make sure it's - // valid. If it is, switch and carry on, otherwise ignore the switch - newPing, _, err := newAPIClient.Ping(ctx) - if err != nil { - a.logger.Warn("Failed to ping the new endpoint %s - ignoring switch for now (%s)", ping.Endpoint, err) - } else { - // Replace the APIClient and process the new ping - a.apiClient = newAPIClient - a.agent.Endpoint = ping.Endpoint - ping = newPing - } - } - - // If we don't have a job, there's nothing to do! - // If we're paused, job should be nil, but in case it isn't, ignore it. - if ping.Job == nil || action == "pause" { - return nil, action, nil - } - - return ping.Job, action, nil -} - // AcquireAndRunJob attempts to acquire a job an run it. It will retry at after the // server determined interval (from the Retry-After response header) if the job is in the waiting // state. If the job is in an unassignable state, it will return an error immediately. @@ -717,51 +487,6 @@ func (a *AgentWorker) AcquireAndRunJob(ctx context.Context, jobId string) error return a.RunJob(ctx, job, nil) } -// Accepts a job and runs it, only returns an error if something goes wrong -func (a *AgentWorker) AcceptAndRunJob(ctx context.Context, job *api.Job, idleMon *idleMonitor) error { - a.logger.Info("Assigned job %s. Accepting...", job.ID) - - // An agent is busy during a job, and idle when the job is done. - idleMon.markBusy(a) - defer idleMon.markIdle(a) - - // Accept the job. We'll retry on connection related issues, but if - // Buildkite returns a 422 or 500 for example, we'll just bail out, - // re-ping, and try the whole process again. - r := roko.NewRetrier( - roko.WithMaxAttempts(30), - roko.WithStrategy(roko.Constant(5*time.Second)), - ) - - accepted, err := roko.DoFunc(ctx, r, func(r *roko.Retrier) (*api.Job, error) { - accepted, _, err := a.apiClient.AcceptJob(ctx, job) - if err != nil { - if api.IsRetryableError(err) { - a.logger.Warn("%s (%s)", err, r) - } else { - a.logger.Warn("Buildkite rejected the call to accept the job (%s)", err) - r.Break() - } - } - return accepted, err - }) - - // If `accepted` is nil, then the job was never accepted - if accepted == nil { - return fmt.Errorf("Failed to accept job: %w", err) - } - - // If we're disconnecting-after-job, signal back to Buildkite that we're not - // interested in jobs after this one. - var ignoreAgentInDispatches *bool - if a.agentConfiguration.DisconnectAfterJob { - ignoreAgentInDispatches = ptr.To(true) - } - - // Now that we've accepted the job, let's run it - return a.RunJob(ctx, accepted, ignoreAgentInDispatches) -} - func (a *AgentWorker) RunJob(ctx context.Context, acceptResponse *api.Job, ignoreAgentInDispatches *bool) error { a.setBusy(acceptResponse.ID) defer a.setIdle() @@ -831,3 +556,32 @@ func (a *AgentWorker) healthHandler() http.HandlerFunc { } } } + +// Internal error values that should not escape to the user. +var ( + // internalStop is used when stopping. + internalStop = errors.New("stop") + + // internalBreak is used to stop an inner loop but continue + // an outer loop. + internalBreak = errors.New("break") +) + +const ( + actorPingLoop = "ping" + actorDebouncer = "debouncer" +) + +type actionMessage struct { + // Details of the action to execute + action, jobID string + + // Results of the action + errCh chan<- error + + // Secret internal action between the streaming loop and debouncer: + // set to true when the streaming loop is unhealthy + // and the baton should be released so the ping loop is unblocked + // (once the current action is completed, if that's the case). + unhealthy bool +} diff --git a/agent/agent_worker_action.go b/agent/agent_worker_action.go new file mode 100644 index 0000000000..e24ef057df --- /dev/null +++ b/agent/agent_worker_action.go @@ -0,0 +1,229 @@ +package agent + +import ( + "context" + "fmt" + "time" + + "github.com/buildkite/agent/v3/api" + "github.com/buildkite/agent/v3/internal/ptr" + "github.com/buildkite/agent/v3/status" + "github.com/buildkite/roko" +) + +func (a *AgentWorker) runActionLoop(ctx context.Context, idleMon *idleMonitor, fromPingLoop, fromDebouncer <-chan actionMessage) error { + a.logger.Debug("[runActionLoop] Starting") + defer a.logger.Debug("[runActionLoop] Exiting") + + // Once this loop terminates, there's no point continuing the others, + // because nothing remains to execute their actions. + defer a.internalStop() + + ctx, setStat, _ := status.AddSimpleItem(ctx, "Action loop") + defer setStat("🛑 Action loop stopped!") + setStat("🏃 Starting...") + + // Start timing disconnect-after-uptime, if configured. + var disconnectAfterUptime <-chan time.Time + maxUptime := a.agentConfiguration.DisconnectAfterUptime + if maxUptime > 0 { + disconnectAfterUptime = time.After(maxUptime) + } + + exitWhenNotPaused := false // the next time the action isn't "pause", exit + ranJob := false + paused := false + + for { + // Did both sources of actions terminate? Then we're done too. + if fromPingLoop == nil && fromDebouncer == nil { + a.logger.Debug("[runActionLoop] All action sources channels are closed, exiting") + return nil + } + + // Wait for one of the following: + // - an action + // - the context to be cancelled + // - the agent is stopping (a.stop) + // - the idle monitor has declared we're all exiting + // (if DisconnectAfterIdleTimeout is configured & we're not paused) + // - disconnect after uptime + // (if DisconnectAfterUptime is configured & we're not paused) + a.logger.Debug("[runActionLoop] Waiting for an action...") + setStat("⌚️ Waiting for an action...") + var msg actionMessage + select { + case m, open := <-fromPingLoop: + if !open { + // Setting to nil prevents this branch of the select from + // happening again. + fromPingLoop = nil + continue + } + a.logger.Debug("[runActionLoop] Got action %q, jobID %q from ping loop", m.action, m.jobID) + msg = m + // continue below + + case m, open := <-fromDebouncer: + if !open { + fromDebouncer = nil + continue + } + a.logger.Debug("[runActionLoop] Got action %q, jobID %q from streaming loop debouncer", m.action, m.jobID) + msg = m + // continue below + + case <-ctx.Done(): + a.logger.Debug("[runActionLoop] Stopping due to context cancel") + return ctx.Err() + + case <-a.stop: + a.logger.Debug("[runActionLoop] Stopping due to agent stop") + return nil + + case <-disconnectAfterUptime: + a.logger.Info("Agent has exceeded max uptime of %v", maxUptime) + if paused { + // Wait to be unpaused before exiting + a.logger.Info("Awaiting resume before disconnecting...") + exitWhenNotPaused = true + continue + } + a.logger.Info("Disconnecting...") + return nil + + case <-idleMon.Exiting(): + // This should only happen if the agent isn't paused. + // (Pausedness is a kind of non-idleness.) + a.logger.Info("All agents have been idle for at least %v. Disconnecting...", idleMon.idleTimeout) + return nil + } + + // Let's handle the action! + a.logger.Debug("[runActionLoop] Performing action %q, jobID %q", msg.action, msg.jobID) + setStat(fmt.Sprintf("🧑‍🍳 Performing %q action...", msg.action)) + pingActions.WithLabelValues(msg.action).Inc() + + // In cases where we need to disconnect, *don't* send on msg.errCh, + // in order to force the <-a.stop branch in the other loops. + // Otherwise, be sure to `close(msg.errCh)`! + switch msg.action { + case "disconnect": + a.logger.Debug("[runActionLoop] Stopping action loop due to disconnect action") + return nil + + case "pause": + // An agent is not dispatched any jobs while it is paused, but the + // paused agent is expected to remain alive and pinging for + // instructions. + // *This includes acquire-job and disconnect-after-idle-timeout.* + a.logger.Debug("[runActionLoop] Entering pause state") + paused = true + // For the purposes of deciding whether or not to exit, + // pausedness is a kind of non-idleness. + // If there's also no job, agent is marked as idle below. + idleMon.MarkBusy(a) + close(msg.errCh) + continue + } + + // At this point, action was neither "disconnect" nor "pause". + if exitWhenNotPaused { + a.logger.Debug("[runActionLoop] Stopping action loop because exitWhenNotPaused is true") + return nil + } + if paused { + // We're not paused any more! Log a helpful message. + a.logger.Info("Agent has resumed after being paused") + paused = false + } + + // For acquire-job agents, registration sets ignore-in-dispatches=true, + // so jobID should be empty. If not, complain. + if a.agentConfiguration.AcquireJob != "" { + if msg.jobID != "" { + a.logger.Error("Agent ping dispatched a job (id %q) but agent is in acquire-job mode! Ignoring the new job", msg.jobID) + } + // Disconnect after acquire-job. + return nil + } + + // In disconnect-after-job mode, finishing the job sets + // ignore-in-dispatches=true. So jobID should be empty. If not, complain. + if ranJob && a.agentConfiguration.DisconnectAfterJob { + if msg.jobID != "" { + a.logger.Error("Agent ping dispatched a job (id %q) but agent is in disconnect-after-job mode (and already ran a job)! Ignoring the new job", msg.jobID) + } + a.logger.Info("Job ran, and disconnect-after-job is enabled. Disconnecting...") + return nil + } + + // If the jobID is empty, then it's an idle message + if msg.jobID == "" { + // This ensures agents that never receive a job are still tracked + // by the idle monitor and can properly trigger disconnect-after-idle-timeout. + idleMon.MarkIdle(a) + close(msg.errCh) + continue + } + + setStat("💼 Accepting job") + + // Runs the job, only errors if something goes wrong + if err := a.AcceptAndRunJob(ctx, msg.jobID, idleMon); err != nil { + a.logger.Error("%v", err) + setStat(fmt.Sprintf("✅ Finished job with error: %v", err)) + msg.errCh <- err // so the ping loop can do something special + close(msg.errCh) + continue + } + + ranJob = true + close(msg.errCh) + } +} + +// Accepts a job and runs it, only returns an error if something goes wrong +func (a *AgentWorker) AcceptAndRunJob(ctx context.Context, jobID string, idleMon *idleMonitor) error { + a.logger.Info("Assigned job %s. Accepting...", jobID) + + // An agent is busy during a job, and idle when the job is done. + idleMon.MarkBusy(a) + defer idleMon.MarkIdle(a) + + // Accept the job. We'll retry on connection related issues, but if + // Buildkite returns a 422 or 500 for example, we'll just bail out, + // re-ping, and try the whole process again. + r := roko.NewRetrier( + roko.WithMaxAttempts(30), + roko.WithStrategy(roko.Constant(5*time.Second)), + ) + + accepted, err := roko.DoFunc(ctx, r, func(r *roko.Retrier) (*api.Job, error) { + accepted, _, err := a.apiClient.AcceptJob(ctx, jobID) + if err != nil { + if api.IsRetryableError(err) { + a.logger.Warn("%s (%s)", err, r) + } else { + a.logger.Warn("Buildkite rejected the call to accept the job (%s)", err) + r.Break() + } + } + return accepted, err + }) + + // If `accepted` is nil, then the job was never accepted + if accepted == nil { + return fmt.Errorf("Failed to accept job: %w", err) + } + + // If we're disconnecting-after-job, signal back to Buildkite that we're not + // interested in jobs after this one. + var ignoreAgentInDispatches *bool + if a.agentConfiguration.DisconnectAfterJob { + ignoreAgentInDispatches = ptr.To(true) + } + + // Now that we've accepted the job, let's run it + return a.RunJob(ctx, accepted, ignoreAgentInDispatches) +} diff --git a/agent/agent_worker_debouncer.go b/agent/agent_worker_debouncer.go new file mode 100644 index 0000000000..f09cc0b000 --- /dev/null +++ b/agent/agent_worker_debouncer.go @@ -0,0 +1,144 @@ +package agent + +import ( + "context" +) + +// runDebouncer is an event debouncing loop between the streaming loop and the +// action handler loop. +// +// There are two *big* differences between the streaming loop and the +// classical ping loop: +// +// 1. When pings happen, they happen "regularly". Actions are only sent +// in response. But when the streaming loop receives messages is up to +// the backend. +// 2. Pings can be put on hold while a job is running. But streaming +// messages can keep arriving during a job. +// +// Firstly, we want to get back to receiving from the stream +// as soon as possible, rather than blocking until the action is handled, +// so that the stream remains healthy. +// Secondly, we need to reduce consecutive messages down to only 0 or 1 correct +// next action(s) following a job. +// For example, say during a job someone clicks "pause" and "resume" +// and "pause" again on this agent. This may cause three distinct +// events to be sent to the streaming loop. If we pass them all on to the +// action handler directly, then the "resume" may cause the agent to +// exit in a one-shot mode, even though the second "pause" means the +// user actually *did* want the agent to be paused. +func (a *AgentWorker) runDebouncer(ctx context.Context, bat *baton, outCh chan<- actionMessage, inCh <-chan actionMessage) error { + a.logger.Debug("[runDebouncer] Starting") + defer a.logger.Debug("[runDebouncer] Exiting") + + // When the debouncer returns, close the output channel to let the next + // loop know to stop listening to it. + defer close(outCh) + + // We begin not running an action. + actionInProgress := false + + // We begin holding the baton, ensure it is released when we exit. + defer func() { + a.logger.Debug("[runDebouncer] Releasing the baton") + bat.Release(actorDebouncer) + }() + + // lastActionResult is closed when the action handler is done handling the + // last action we sent. + // It starts nil because at the beginning, there is no previous action. + var lastActionResult chan error + + // pending is the next message to send, when able. + var pending *actionMessage + + // Is the stream healthy? + // If so, take the baton (which blocks the ping loop). + // If not, return the baton (unblocking the ping loop). + // Returning the baton may have to wait for the current action to complete. + healthy := true + + for { + select { + case <-a.stop: + a.logger.Debug("[runDebouncer] Stopping due to agent stop") + return nil + case <-ctx.Done(): + a.logger.Debug("[runDebouncer] Stopping due to context cancel") + return ctx.Err() + + case <-iif(healthy, bat.Acquire(actorDebouncer)): // if the stream is healthy, take the baton if available + a.logger.Debug("[runDebouncer] Took the baton") + // We now have the baton! + // continue below to send any pending message, if able + + case msg, open := <-inCh: // streaming loop has produced an event + if !open { + a.logger.Debug("[runDebouncer] Stopping due to input channel closing") + return nil + } + + healthy = !msg.unhealthy + + if !healthy { + a.logger.Debug("[runDebouncer] Streaming loop is unhealthy") + + // It is not healthy, so release the baton as soon as we can + // (when the current action is done). + if !actionInProgress { + // We can release the baton now. + a.logger.Debug("[runDebouncer] Releasing the baton") + bat.Release(actorDebouncer) + } + break // out of the select + } + + // The next message to send is, currently, always the most recent + // healthy message. + pending = &msg + + // continue below to send it + + case <-lastActionResult: // most recent action has completed + a.logger.Debug("[runDebouncer] Last action has completed") + // Set the channel variable to nil so we don't spinloop. + // (Operations on a nil channel block forever.) + lastActionResult = nil + actionInProgress = false + + // continue below to send a pending message + } + + // If we're healthy, have the baton, there's no action in progress, + // and there's a pending message, then send that message. + if !healthy || !bat.HeldBy(actorDebouncer) || actionInProgress || pending == nil { + continue + } + a.logger.Debug("[runDebouncer] Sending action %q, jobID %q", pending.action, pending.jobID) + lastActionResult = make(chan error) + pending.errCh = lastActionResult + select { + case outCh <- *pending: + // sent! + pending = nil + actionInProgress = true + case <-a.stop: + a.logger.Debug("[runDebouncer] Stopping due to agent stop") + return nil + case <-ctx.Done(): + a.logger.Debug("[runDebouncer] Stopping due to context cancel") + return ctx.Err() + } + } +} + +// iif returns t if b is true, otherwise it returns the zero value of T. +// This is useful for enabling or disabling a select case based on a test +// evaluated at the start of the select. +func iif[T any](b bool, t T) T { + if b { + return t + } + var f T + return f +} diff --git a/agent/agent_worker_heartbeat.go b/agent/agent_worker_heartbeat.go new file mode 100644 index 0000000000..ca4f751b6c --- /dev/null +++ b/agent/agent_worker_heartbeat.go @@ -0,0 +1,52 @@ +package agent + +import ( + "context" + "time" + + "github.com/buildkite/agent/v3/status" +) + +func (a *AgentWorker) runHeartbeatLoop(ctx context.Context) error { + ctx, setStat, _ := status.AddSimpleItem(ctx, "Heartbeat loop") + defer setStat("💔 Heartbeat loop stopped!") + setStat("🏃 Starting...") + + heartbeatInterval := time.Second * time.Duration(a.agent.HeartbeatInterval) + heartbeatTicker := time.NewTicker(heartbeatInterval) + defer heartbeatTicker.Stop() + for { + setStat("😴 Sleeping for a bit") + select { + case <-heartbeatTicker.C: + setStat("❤️ Sending heartbeat") + if err := a.Heartbeat(ctx); err != nil { + if isUnrecoverable(err) { + a.logger.Error("%s", err) + // unrecoverable heartbeat failure also stops everything else + a.StopUngracefully() + return err + } + + // Get the last heartbeat time to the nearest microsecond + a.stats.Lock() + if a.stats.lastHeartbeat.IsZero() { + a.logger.Error("Failed to heartbeat %s. Will try again in %v. (No heartbeat yet)", + err, heartbeatInterval) + } else { + a.logger.Error("Failed to heartbeat %s. Will try again in %v. (Last successful was %v ago)", + err, heartbeatInterval, time.Since(a.stats.lastHeartbeat)) + } + a.stats.Unlock() + } + + case <-ctx.Done(): + a.logger.Debug("Stopping heartbeats due to context cancel") + // An alternative to returning nil would be ctx.Err(), but we use + // the context for ordinary termination of this loop. + // A context cancellation from outside the agent worker would still + // be reflected in the value returned by the ping loop return. + return nil + } + } +} diff --git a/agent/agent_worker_ping.go b/agent/agent_worker_ping.go new file mode 100644 index 0000000000..7e4957ad03 --- /dev/null +++ b/agent/agent_worker_ping.go @@ -0,0 +1,271 @@ +package agent + +import ( + "context" + "fmt" + "math/rand/v2" + "time" + + "github.com/buildkite/agent/v3/api" + "github.com/buildkite/agent/v3/status" +) + +// runPingLoop runs the (classical) loop that pings Buildkite for work. +func (a *AgentWorker) runPingLoop(ctx context.Context, bat *baton, outCh chan<- actionMessage) error { + a.logger.Debug("[runPingLoop] Starting") + defer a.logger.Debug("[runPingLoop] Exiting") + + // When this loop returns, close the channel to let the action handler loop + // stop listening for actions from it. + defer close(outCh) + + ctx, setStat, _ := status.AddSimpleItem(ctx, "Ping loop") + defer setStat("🛑 Ping loop stopped!") + setStat("🏃 Starting...") + + pingInterval := time.Second * time.Duration(a.agent.PingInterval) + state := &pingLoopState{ + AgentWorker: a, + bat: bat, + outCh: outCh, + pingInterval: pingInterval, + pingTicker: time.NewTicker(pingInterval), + skipWait: make(chan struct{}, 1), + setStat: setStat, + } + defer state.pingTicker.Stop() + + // On the first iteration, skip waiting for the pingTicker. + // One buffered value won't skip the jitter, though. + state.skipWait <- struct{}{} + if a.noWaitBetweenPingsForTesting { + // a closed channel will unblock the for/select instantly, for zero-delay ping loop testing. + close(state.skipWait) + } + + a.logger.Info("Waiting for instructions...") + + for { + startWait := time.Now() + a.logger.Debug("[runPingLoop] Waiting for pingTicker") + setStat("😴 Waiting until next ping interval tick") + select { + case <-state.skipWait: + // continue below + case <-state.pingTicker.C: + // continue below + case <-a.stop: + a.logger.Debug("[runPingLoop] Stopping due to agent stop") + return nil + case <-ctx.Done(): + a.logger.Debug("[runPingLoop] Stopping due to context cancel") + return ctx.Err() + } + + // Within the interval, wait a random amount of time to avoid + // spontaneous synchronisation across agents. + jitter := rand.N(pingInterval) + a.logger.Debug("[runPingLoop] Waiting for jitter %v", jitter) + setStat(fmt.Sprintf("🫨 Jittering for %v", jitter)) + select { + case <-state.skipWait: + // continue below + case <-time.After(jitter): + // continue below + case <-a.stop: + a.logger.Debug("[runPingLoop] Stopping due to agent stop") + return nil + case <-ctx.Done(): + a.logger.Debug("[runPingLoop] Stopping due to context cancel") + return ctx.Err() + } + pingWaitDurations.Observe(time.Since(startWait).Seconds()) + + err := state.pingLoopInner(ctx) + if err == internalStop { + return nil + } + if err != nil { + return err + } + } +} + +// pingLoopState exists to pass parameters to pingLoopInner. +type pingLoopState struct { + *AgentWorker + bat *baton + outCh chan<- actionMessage + setStat func(string) + pingTicker *time.Ticker + pingInterval time.Duration + skipWait chan struct{} +} + +func (a *pingLoopState) pingLoopInner(ctx context.Context) error { + // Wait until the baton is available. If this takes forever, that's + // a good thing because it should mean the streaming loop is + // healthy. + // Once acquired, only release the baton after any work is complete, + // to prevent the streaming loop from resuming control until then, + // but we always release the baton, because the streaming loop is + // preferred. + a.logger.Debug("[runPingLoop] Waiting for baton") + select { + case <-a.bat.Acquire(actorPingLoop): // the baton is ours! + a.logger.Debug("[runPingLoop] Acquired the baton") + defer func() { // <- this is why the ping loop body is in a func + a.logger.Debug("[runPingLoop] Releasing the baton") + a.bat.Release(actorPingLoop) + }() + + case <-a.stop: + a.logger.Debug("[runPingLoop] Stopping due to agent stop") + return internalStop + case <-ctx.Done(): + a.logger.Debug("[runPingLoop] Stopping due to context cancel") + return ctx.Err() + } + + a.logger.Debug("[runPingLoop] Pinging buildkite for instructions") + a.setStat("📡 Pinging Buildkite for instructions") + pingsSent.Inc() + startPing := time.Now() + jobID, action, err := a.Ping(ctx) + if err != nil { + pingErrors.Inc() + if isUnrecoverable(err) { + a.logger.Error("%v", err) + return err + } + a.logger.Warn("%v", err) + } + pingDurations.Observe(time.Since(startPing).Seconds()) + + a.logger.Debug("[runPingLoop] Sending action") + + // Send the action to the action loop + errCh := make(chan error) + msg := actionMessage{ + action: action, + jobID: jobID, + errCh: errCh, + } + select { + case a.outCh <- msg: + // sent! + case <-a.stop: + a.logger.Debug("[runPingLoop] Stopping due to agent stop") + return internalStop + case <-ctx.Done(): + a.logger.Debug("[runPingLoop] Stopping due to context cancel") + return ctx.Err() + } + + // Wait for completion + select { + case err := <-errCh: + if err != nil || jobID == "" { + // We don't terminate the ping loop just because the + // action (usually a job) has failed. + return nil + } + if a.noWaitBetweenPingsForTesting { + // Don't bother resetting the ticker, + // don't try to send on a closed channel (skipWait). + return nil + } + // A job ran (or was at least started) successfully. + // Observation: jobs are rarely the last within a pipeline, + // thus if this worker just completed a job, + // there is likely another immediately available. + // Skip waiting for the ping interval until + // a ping without a job has occurred, + // but in exchange, ensure the next ping must wait at least a full + // pingInterval to avoid too much server load. + a.pingTicker.Reset(a.pingInterval) + select { + case a.skipWait <- struct{}{}: + // Ticker will be skipped + default: + // We're already skipping the ticker, don't block. + } + return nil + case <-a.stop: + a.logger.Debug("[runPingLoop] Stopping due to agent stop") + return internalStop + case <-ctx.Done(): + a.logger.Debug("[runPingLoop] Stopping due to context cancel") + return ctx.Err() + } +} + +// Performs a ping that checks Buildkite for a job or action to take +// Returns a job, or nil if none is found +func (a *AgentWorker) Ping(ctx context.Context) (jobID, action string, err error) { + ping, resp, pingErr := a.apiClient.Ping(ctx) + // wait a minute, where's my if err != nil block? TL;DR look for pingErr ~20 lines down + // the api client returns an error if the response code isn't a 2xx, but there's still information in resp and ping + // that we need to check out to do special handling for specific error codes or messages in the response body + // once we've done that, we can do the error handling for pingErr + + if ping != nil { + // Is there a message that should be shown in the logs? + if ping.Message != "" { + a.logger.Info(ping.Message) + } + + action = ping.Action + } + + if pingErr != nil { + // If the ping has a non-retryable status, we have to kill the agent, there's no way of recovering + // The reason we do this after the disconnect check is because the backend can (and does) send disconnect actions in + // responses with non-retryable statuses + if resp != nil && !api.IsRetryableStatus(resp) { + return "", action, &errUnrecoverable{action: "Ping", response: resp, err: pingErr} + } + + // Get the last ping time to the nearest microsecond + a.stats.Lock() + defer a.stats.Unlock() + + // If a ping fails, we don't really care, because it'll + // ping again after the interval. + if a.stats.lastPing.IsZero() { + return "", action, fmt.Errorf("Failed to ping: %w (No successful ping yet)", pingErr) + } else { + return "", action, fmt.Errorf("Failed to ping: %w (Last successful was %v ago)", pingErr, time.Since(a.stats.lastPing)) + } + } + + // Track a timestamp for the successful ping for better errors + a.stats.Lock() + a.stats.lastPing = time.Now() + a.stats.Unlock() + + // Should we switch endpoints? + if ping.Endpoint != "" && ping.Endpoint != a.agent.Endpoint { + newAPIClient := a.apiClient.FromPing(ping) + + // Before switching to the new one, do a ping test to make sure it's + // valid. If it is, switch and carry on, otherwise ignore the switch + newPing, _, err := newAPIClient.Ping(ctx) + if err != nil { + a.logger.Warn("Failed to ping the new endpoint %s - ignoring switch for now (%s)", ping.Endpoint, err) + } else { + // Replace the APIClient and process the new ping + a.apiClient = newAPIClient + a.agent.Endpoint = ping.Endpoint + ping = newPing + } + } + + // If we don't have a job, there's nothing to do! + // If we're paused, job should be nil, but in case it isn't, ignore it. + if ping.Job == nil || action == "pause" { + return "", action, nil + } + + return ping.Job.ID, action, nil +} diff --git a/agent/agent_worker_streaming.go b/agent/agent_worker_streaming.go new file mode 100644 index 0000000000..3107c3d505 --- /dev/null +++ b/agent/agent_worker_streaming.go @@ -0,0 +1,264 @@ +package agent + +import ( + "context" + "fmt" + "math/rand/v2" + "time" + + "connectrpc.com/connect" + agentedgev1 "github.com/buildkite/agent/v3/api/proto/gen" + "github.com/buildkite/agent/v3/status" +) + +// runStreamingPingLoop runs the streaming loop. It is best-effort +// (allowed to fail and fall back to the regular ping loop) but when it works +// it is preferred because there is less waiting around. +func (a *AgentWorker) runStreamingPingLoop(ctx context.Context, outCh chan<- actionMessage) error { + a.logger.Debug("[runStreamingPingLoop] Starting") + defer a.logger.Debug("[runStreamingPingLoop] Exiting") + + // When this loop returns, close the channel to let the next loop stop + // listening to it. + defer close(outCh) + + ctx, setStat, _ := status.AddSimpleItem(ctx, "Streaming ping loop") + defer setStat("🛑 Ping stream loop stopped!") + setStat("🏃 Starting...") + + // The stream Receive call blocks until a message is received - we can't + // select on it. streamCtx exists to end the stream on agent stop. + streamCtx, cancelStream := context.WithCancel(ctx) + defer cancelStream() + go func() { + <-a.stop + cancelStream() + }() + + // Because we expect the streaming connection to last much longer than a + // ping, we should use a different doctrine compared with the ping loop. + // + // This loop is a repeated fuzzed exponential backoff: + // + // If the connection is successful, once it closes, the next connection will + // begin after a minimal jitter. + // While the connection fails, each attempt will jitter over double the + // previous interval before attempting reconnection. + // + // Note: This _could_ be implemented with an infinite loop containing a roko + // retrier, but it looked a bit messier to me. + initialWindow := 1 * time.Second + + var skipWait chan struct{} + if a.noWaitBetweenPingsForTesting { + // a closed channel will unblock the select instantly, for zero-delay loop testing. + skipWait = make(chan struct{}) + close(skipWait) + } + + state := &streamLoopState{ + AgentWorker: a, + outCh: outCh, + setStat: setStat, + } + + for { + // Backoff exponentially, up to initialWindow * 2^6. + // (Repeated failures may jitter up to 64 seconds between attempts.) + window := initialWindow << min(state.attempts, 6) + windowEnd := time.After(window) + state.attempts++ + + // Within the interval, wait a random amount of time to avoid + // spontaneous synchronisation across agents. + jitter := rand.N(window) + setStat(fmt.Sprintf("🫨 Jittering for %v (max %v)", jitter, window)) + a.logger.Debug("[runStreamingPingLoop] Waiting for jitter %v (max %v)", jitter, window) + select { + case <-skipWait: + // continue below + case <-time.After(jitter): + // continue below + case <-a.stop: + a.logger.Debug("[runStreamingPingLoop] Stopping due to agent stop") + return nil + case <-ctx.Done(): + a.logger.Debug("[runStreamingPingLoop] Stopping due to context cancel") + return ctx.Err() + } + + err := state.startStream(ctx, streamCtx) + if err == internalStop { + return nil + } + if err != nil { + return err + } + + // Wait the remainder of the jitter window. + // Windowing the jitter this way avoids statistical effects. + // (If we started a new jitter right away, the Nth request would + // happen at an approximately Normally-distributed time after start, + // because that's a sum of random variables each with finite variance. + // Central Limit Theorem! We'd rather have a uniform distribution + // over a window.) + setStat("😴 Waiting for remainder of window") + a.logger.Debug("[runStreamingPingLoop] Waiting for remainder of window") + select { + case <-skipWait: + // continue next iteration + case <-windowEnd: + // continue next iteration + case <-a.stop: + a.logger.Debug("[runStreamingPingLoop] Stopping due to agent stop") + return nil + case <-ctx.Done(): + a.logger.Debug("[runStreamingPingLoop] Stopping due to context cancel") + return ctx.Err() + } + } +} + +// streamLoopState holds stream loop specific state for startStream +// and streamLoopInner. +type streamLoopState struct { + *AgentWorker + outCh chan<- actionMessage + attempts int + firstMsg bool + setStat func(string) +} + +// startStream attempts 1 connection to the stream and handles its messages. +func (a *streamLoopState) startStream(ctx, streamCtx context.Context) error { + a.setStat(fmt.Sprintf("📱 Connecting to ping stream (attempt %d)...", a.attempts)) + a.logger.Debug("[runStreamingPingLoop] Connecting (attempt %d)", a.attempts) + stream, err := a.apiClient.StreamPings(streamCtx, a.agent.UUID) + if err != nil { + a.logger.Error("Connection to ping stream failed: %v", err) + if isUnrecoverable(err) { + a.logger.Error("Stopping ping stream because the error is unrecoverable") + return err + } + // Fast fallback to the ping loop + a.logger.Debug("[runStreamingPingLoop] Becoming unhealthy") + select { + case a.outCh <- actionMessage{unhealthy: true}: + a.logger.Debug("[runStreamingPingLoop] Unhealthy message sent to debouncer") + // sent! + case <-a.stop: + a.logger.Debug("[runStreamingPingLoop] Stopping due to agent stop") + return internalStop + case <-ctx.Done(): + a.logger.Debug("[runStreamingPingLoop] Stopping due to context cancel") + return ctx.Err() + } + return nil // continue outer streaming loop + } + + a.firstMsg = true // used for the "connection established" log + + a.setStat("🏞️ Streaming actions from Buildkite") + a.logger.Debug("[runStreamingPingLoop] Waiting for a message...") + for msg, streamErr := range stream { + err := a.handle(ctx, msg, streamErr) + if err == internalBreak { + break + } + if err == internalStop { + return internalStop + } + if err != nil { + return err + } + } + return nil +} + +func (a *streamLoopState) handle(ctx context.Context, msg *agentedgev1.StreamPingsResponse, streamErr error) error { + a.logger.Debug("[runStreamingPingLoop] Received msg %v, err %v", msg, streamErr) + + var amsg actionMessage + switch { + case streamErr != nil: + a.logger.Debug("[runStreamingPingLoop] Connection to ping stream failed or ended: %v", streamErr) + if isUnrecoverable(streamErr) { + a.logger.Error("Stopping ping stream loop because the error is unrecoverable: %v", streamErr) + return streamErr + } + // Stay healthy if the error is deadline-exceeded. + // (The connection timed out, which we want to happen every so often). + if connect.CodeOf(streamErr) == connect.CodeDeadlineExceeded { + a.logger.Debug("[runStreamingPingLoop] Breaking stream loop to reconnect following deadline-exceeded") + return internalBreak + } + // It's some other error. Go unhealthy, which unblocks the ping loop. + a.logger.Debug("[runStreamingPingLoop] Becoming unhealthy") + amsg.unhealthy = true + + case msg == nil: + a.logger.Error("Ping stream yielded a nil message, so assuming the stream is broken") + a.logger.Debug("[runStreamingPingLoop] Becoming unhealthy") + amsg.unhealthy = true + + default: + if a.firstMsg { + a.logger.Info("Ping stream connection established") + a.firstMsg = false + } + + switch act := msg.Action.(type) { + case *agentedgev1.StreamPingsResponse_Resume: // a.k.a. "idle" + // continue below + + case *agentedgev1.StreamPingsResponse_Pause: + if reason := act.Pause.GetReason(); reason != "" { + a.logger.Info("Pause reason: %s", reason) + } + amsg.action = "pause" + + case *agentedgev1.StreamPingsResponse_Disconnect: + if reason := act.Disconnect.GetReason(); reason != "" { + a.logger.Info("Disconnect reason: %s", reason) + } + amsg.action = "disconnect" + + case *agentedgev1.StreamPingsResponse_JobAssigned: + amsg.jobID = act.JobAssigned.GetJob().GetId() + if amsg.jobID == "" { + a.logger.Error("Ping stream yielded a JobAssigned message with nil job or empty job ID, so assuming the stream is broken") + a.logger.Debug("[runStreamingPingLoop] Becoming unhealthy") + amsg.unhealthy = true + } + } + } + + // Send the message to the debouncer. + select { + case a.outCh <- amsg: + a.logger.Debug("[runStreamingPingLoop] Message sent to debouncer") + // sent! + case <-a.stop: + a.logger.Debug("[runStreamingPingLoop] Stopping due to agent stop") + return internalStop + case <-ctx.Done(): + a.logger.Debug("[runStreamingPingLoop] Stopping due to context cancel") + return ctx.Err() + } + + // In case the server sends a disconnect but doesn't close the + // stream, be sure to exit. + if amsg.action == "disconnect" { + a.logger.Debug("[runStreamingPingLoop] Stopping due to disconnect action") + a.internalStop() + return internalStop + } + + if amsg.unhealthy { + a.logger.Debug("[runStreamingPingLoop] Breaking stream loop to reconnect because the stream is unhealthy") + return internalBreak + } + // Stream is healthy, reset the retry counter + a.attempts = 0 + return nil +} diff --git a/agent/agent_worker_test.go b/agent/agent_worker_test.go index ce33d0c878..e6d4ddcd7a 100644 --- a/agent/agent_worker_test.go +++ b/agent/agent_worker_test.go @@ -16,7 +16,9 @@ import ( "testing" "time" + "connectrpc.com/connect" "github.com/buildkite/agent/v3/api" + agentedgev1 "github.com/buildkite/agent/v3/api/proto/gen" "github.com/buildkite/agent/v3/core" "github.com/buildkite/agent/v3/logger" "github.com/buildkite/agent/v3/metrics" @@ -49,8 +51,6 @@ func TestDisconnect(t *testing.T) { })) defer server.Close() - ctx := context.Background() - apiClient := api.NewClient(logger.Discard, api.Config{ Endpoint: server.URL, Token: "llamas", @@ -73,7 +73,7 @@ func TestDisconnect(t *testing.T) { agentConfiguration: AgentConfiguration{}, } - err := worker.Disconnect(ctx) + err := worker.Disconnect(t.Context()) require.NoError(t, err) assert.Equal(t, []string{"[info] Disconnecting...", "[info] Disconnected"}, l.Messages) @@ -100,8 +100,6 @@ func TestDisconnectRetry(t *testing.T) { })) defer server.Close() - ctx := context.Background() - apiClient := api.NewClient(logger.Discard, api.Config{ Endpoint: server.URL, Token: "llamas", @@ -126,7 +124,7 @@ func TestDisconnectRetry(t *testing.T) { agentConfiguration: AgentConfiguration{}, } - err := worker.Disconnect(ctx) + err := worker.Disconnect(t.Context()) assert.NoError(t, err) // 2 failed attempts sleep 1 second each @@ -142,9 +140,6 @@ func TestDisconnectRetry(t *testing.T) { func TestAcquireJobReturnsWrappedError_WhenServerResponds422(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - jobID := "some-uuid" server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { @@ -175,7 +170,7 @@ func TestAcquireJobReturnsWrappedError_WhenServerResponds422(t *testing.T) { agentConfiguration: AgentConfiguration{}, } - err := worker.AcquireAndRunJob(ctx, jobID) + err := worker.AcquireAndRunJob(t.Context(), jobID) if !errors.Is(err, core.ErrJobAcquisitionRejected) { t.Fatalf("expected worker.AcquireAndRunJob(%q) = core.ErrJobAcquisitionRejected, got %v", jobID, err) } @@ -184,9 +179,6 @@ func TestAcquireJobReturnsWrappedError_WhenServerResponds422(t *testing.T) { func TestAcquireAndRunJobWaiting(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { switch req.URL.Path { case "/jobs/waitinguuid/acquire": @@ -233,7 +225,7 @@ func TestAcquireAndRunJobWaiting(t *testing.T) { agentConfiguration: AgentConfiguration{}, } - err := worker.AcquireAndRunJob(ctx, "waitinguuid") + err := worker.AcquireAndRunJob(t.Context(), "waitinguuid") assert.ErrorContains(t, err, "423") if !errors.Is(err, core.ErrJobLocked) { @@ -251,9 +243,6 @@ func TestAcquireAndRunJobWaiting(t *testing.T) { func TestAgentWorker_Start_AcquireJob_JobAcquisitionRejected(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { switch req.URL.Path { case "/jobs/waitinguuid/acquire": @@ -317,11 +306,9 @@ func TestAgentWorker_Start_AcquireJob_JobAcquisitionRejected(t *testing.T) { ) worker.noWaitBetweenPingsForTesting = true - idleMonitor := newIdleMonitor(1) - // we expect the worker to try to acquire the job, but fail with ErrJobAcquisitionRejected // because the server returns a 422 Unprocessable Entity. - err := worker.Start(ctx, idleMonitor) + err := worker.Start(t.Context(), nil) if !errors.Is(err, core.ErrJobAcquisitionRejected) { t.Fatalf("expected worker.AcquireAndRunJob(%q) = core.ErrJobAcquisitionRejected, got %v", jobID, err) } @@ -330,9 +317,6 @@ func TestAgentWorker_Start_AcquireJob_JobAcquisitionRejected(t *testing.T) { func TestAgentWorker_Start_AcquireJob_Pause_Unpause(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - buildPath := filepath.Join(os.TempDir(), t.Name(), "build") hooksPath := filepath.Join(os.TempDir(), t.Name(), "hooks") if err := errors.Join(os.MkdirAll(buildPath, 0o777), os.MkdirAll(hooksPath, 0o777)); err != nil { @@ -400,9 +384,7 @@ func TestAgentWorker_Start_AcquireJob_Pause_Unpause(t *testing.T) { ) worker.noWaitBetweenPingsForTesting = true - idleMonitor := newIdleMonitor(1) - - if err := worker.Start(ctx, idleMonitor); err != nil { + if err := worker.Start(t.Context(), nil); err != nil { t.Errorf("worker.Start() = %v", err) } @@ -417,9 +399,6 @@ func TestAgentWorker_Start_AcquireJob_Pause_Unpause(t *testing.T) { func TestAgentWorker_DisconnectAfterJob_Start_Pause_Unpause(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - buildPath := filepath.Join(os.TempDir(), t.Name(), "build") hooksPath := filepath.Join(os.TempDir(), t.Name(), "hooks") if err := errors.Join(os.MkdirAll(buildPath, 0o777), os.MkdirAll(hooksPath, 0o777)); err != nil { @@ -494,9 +473,7 @@ func TestAgentWorker_DisconnectAfterJob_Start_Pause_Unpause(t *testing.T) { ) worker.noWaitBetweenPingsForTesting = true - idleMonitor := newIdleMonitor(1) - - if err := worker.Start(ctx, idleMonitor); err != nil { + if err := worker.Start(t.Context(), nil); err != nil { t.Errorf("worker.Start() = %v", err) } @@ -514,9 +491,6 @@ func TestAgentWorker_DisconnectAfterJob_Start_Pause_Unpause(t *testing.T) { func TestAgentWorker_DisconnectAfterUptime(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - buildPath := filepath.Join(os.TempDir(), t.Name(), "build") hooksPath := filepath.Join(os.TempDir(), t.Name(), "hooks") if err := errors.Join(os.MkdirAll(buildPath, 0o777), os.MkdirAll(hooksPath, 0o777)); err != nil { @@ -581,12 +555,10 @@ func TestAgentWorker_DisconnectAfterUptime(t *testing.T) { ) worker.noWaitBetweenPingsForTesting = true - idleMonitor := newIdleMonitor(1) - // Record start time startTime := time.Now() - if err := worker.Start(ctx, idleMonitor); err != nil { + if err := worker.Start(t.Context(), nil); err != nil { t.Errorf("worker.Start() = %v", err) } @@ -596,14 +568,9 @@ func TestAgentWorker_DisconnectAfterUptime(t *testing.T) { t.Errorf("Agent should have disconnected after ~1 second, but took %v", elapsed) } - // The agent should have made at least one ping before disconnecting - if pingCount == 0 { - t.Error("Agent should have made at least one ping before disconnecting") - } - - // The agent should have made at least one ping and should have disconnected - // due to max uptime being exceeded. The important thing is that the agent - // disconnected properly with the uptime check, which we verified above. + // The agent may not get around to pinging before the uptime is exceeded. + // The important thing is that the agent disconnected properly with the + // uptime check, which we verified above. } func TestAgentWorker_SetEndpointDuringRegistration(t *testing.T) { @@ -611,9 +578,6 @@ func TestAgentWorker_SetEndpointDuringRegistration(t *testing.T) { // is passed into agent.NewAgentWorker(...), so we'll just test the response handling. t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - server := NewFakeAPIServer() defer server.Close() targetEndpoint := server.URL @@ -665,7 +629,7 @@ func TestAgentWorker_SetEndpointDuringRegistration(t *testing.T) { ) worker.noWaitBetweenPingsForTesting = true - if err := worker.Start(ctx, newIdleMonitor(1)); err != nil { + if err := worker.Start(t.Context(), nil); err != nil { t.Errorf("worker.Start() = %v", err) } @@ -677,9 +641,6 @@ func TestAgentWorker_SetEndpointDuringRegistration(t *testing.T) { func TestAgentWorker_UpdateEndpointDuringPing(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - const agentSessionToken = "alpacas" // the first endpoint, to be redirected from @@ -754,7 +715,7 @@ func TestAgentWorker_UpdateEndpointDuringPing(t *testing.T) { ) worker.noWaitBetweenPingsForTesting = true - if err := worker.Start(ctx, newIdleMonitor(1)); err != nil { + if err := worker.Start(t.Context(), nil); err != nil { t.Errorf("worker.Start() = %v", err) } @@ -767,9 +728,6 @@ func TestAgentWorker_UpdateEndpointDuringPing(t *testing.T) { func TestAgentWorker_UpdateEndpointDuringPing_FailAndRevert(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - const agentSessionToken = "alpacas" // A working endpoint for the original ping @@ -834,7 +792,7 @@ func TestAgentWorker_UpdateEndpointDuringPing_FailAndRevert(t *testing.T) { ) worker.noWaitBetweenPingsForTesting = true - if err := worker.Start(ctx, newIdleMonitor(1)); err != nil { + if err := worker.Start(t.Context(), nil); err != nil { t.Errorf("worker.Start() = %v", err) } @@ -846,9 +804,6 @@ func TestAgentWorker_UpdateEndpointDuringPing_FailAndRevert(t *testing.T) { func TestAgentWorker_SetRequestHeadersDuringRegistration(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - const headerKey = "Buildkite-Hello" const headerValue = "world" @@ -888,7 +843,7 @@ func TestAgentWorker_SetRequestHeadersDuringRegistration(t *testing.T) { }) client := &core.Client{APIClient: apiClient, Logger: l} // the underlying api.Client will capture & store the server-specified request headers here... - reg, err := client.Register(ctx, api.AgentRegisterRequest{}) + reg, err := client.Register(t.Context(), api.AgentRegisterRequest{}) if err != nil { t.Fatalf("failed to register: %v", err) } @@ -903,7 +858,7 @@ func TestAgentWorker_SetRequestHeadersDuringRegistration(t *testing.T) { ) worker.noWaitBetweenPingsForTesting = true - if err := worker.Start(ctx, newIdleMonitor(1)); err != nil { + if err := worker.Start(t.Context(), nil); err != nil { t.Errorf("worker.Start() = %v", err) } @@ -915,9 +870,6 @@ func TestAgentWorker_SetRequestHeadersDuringRegistration(t *testing.T) { func TestAgentWorker_UpdateRequestHeadersDuringPing(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - const agentSessionToken = "alpacas" server := NewFakeAPIServer() @@ -991,7 +943,7 @@ func TestAgentWorker_UpdateRequestHeadersDuringPing(t *testing.T) { ) worker.noWaitBetweenPingsForTesting = true - if err := worker.Start(ctx, newIdleMonitor(1)); err != nil { + if err := worker.Start(t.Context(), nil); err != nil { t.Errorf("worker.Start() = %v", err) } @@ -1003,9 +955,6 @@ func TestAgentWorker_UpdateRequestHeadersDuringPing(t *testing.T) { func TestAgentWorker_UnrecoverableErrorInPing(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - const agentSessionToken = "alpacas" server := NewFakeAPIServer() @@ -1046,7 +995,7 @@ func TestAgentWorker_UnrecoverableErrorInPing(t *testing.T) { ) worker.noWaitBetweenPingsForTesting = true - if err := worker.Start(ctx, newIdleMonitor(1)); !isUnrecoverable(err) { + if err := worker.Start(t.Context(), nil); !isUnrecoverable(err) { t.Errorf("worker.Start() = %v, want an unrecoverable error", err) } @@ -1054,3 +1003,384 @@ func TestAgentWorker_UnrecoverableErrorInPing(t *testing.T) { t.Errorf("agent.Pings = %d, want %d", got, want) } } + +func TestAgentWorker_Streaming_Disconnect(t *testing.T) { + t.Parallel() + + server := NewFakeAPIServer(WithStreaming) + defer server.Close() + + const agentSessionToken = "alpacas" + agent := server.AddAgent(agentSessionToken) + agent.PingHandler = func(*http.Request) (api.Ping, error) { + return api.Ping{}, errors.New("too many pings") + } + go func() { + agent.PingStream <- &agentedgev1.StreamPingsResponse{ + Action: &agentedgev1.StreamPingsResponse_Disconnect{}, + } + close(agent.PingStream) + }() + + l := logger.NewConsoleLogger(logger.NewTestPrinter(t), func(int) {}) + + worker := NewAgentWorker( + l, + &api.AgentRegisterResponse{ + UUID: uuid.New().String(), + Name: "agent-1", + AccessToken: agentSessionToken, + Endpoint: server.URL, + PingInterval: 1, + JobStatusInterval: 5, + HeartbeatInterval: 60, + }, + metrics.NewCollector(logger.Discard, metrics.CollectorConfig{}), + api.NewClient(logger.Discard, api.Config{ + Endpoint: server.URL, + Token: "llamas", + }), + AgentWorkerConfig{}, + ) + + if err := worker.Start(t.Context(), nil); err != nil { + t.Errorf("worker.Start() error = %v, want nil", err) + } + if got, want := agent.Pings, 0; got != want { + t.Errorf("agent.Pings = %d, want %d", got, want) + } +} + +func TestAgentWorker_Streaming_Pause_Resume_Disconnect(t *testing.T) { + t.Parallel() + + server := NewFakeAPIServer(WithStreaming) + defer server.Close() + + const agentSessionToken = "alpacas" + agent := server.AddAgent(agentSessionToken) + agent.PingHandler = func(*http.Request) (api.Ping, error) { + return api.Ping{}, errors.New("too many pings") + } + go func() { + agent.PingStream <- &agentedgev1.StreamPingsResponse{ + Action: &agentedgev1.StreamPingsResponse_Pause{ + Pause: &agentedgev1.PauseAction{ + Reason: "Agent has been paused", + }, + }, + } + agent.PingStream <- &agentedgev1.StreamPingsResponse{ + Action: &agentedgev1.StreamPingsResponse_Resume{}, + } + agent.PingStream <- &agentedgev1.StreamPingsResponse{ + Action: &agentedgev1.StreamPingsResponse_Disconnect{}, + } + close(agent.PingStream) + }() + + l := logger.NewConsoleLogger(logger.NewTestPrinter(t), func(int) {}) + + worker := NewAgentWorker( + l, + &api.AgentRegisterResponse{ + UUID: uuid.New().String(), + Name: "agent-1", + AccessToken: agentSessionToken, + Endpoint: server.URL, + PingInterval: 1, + JobStatusInterval: 5, + HeartbeatInterval: 60, + }, + metrics.NewCollector(logger.Discard, metrics.CollectorConfig{}), + api.NewClient(logger.Discard, api.Config{ + Endpoint: server.URL, + Token: "llamas", + }), + AgentWorkerConfig{}, + ) + + if err := worker.Start(t.Context(), nil); err != nil { + t.Errorf("worker.Start() error = %v, want nil", err) + } + if got, want := agent.Pings, 0; got != want { + t.Errorf("agent.Pings = %d, want %d", got, want) + } +} + +func TestAgentWorker_Streaming_Start_AcquireJob_Pause_Unpause(t *testing.T) { + t.Parallel() + + buildPath := filepath.Join(os.TempDir(), t.Name(), "build") + hooksPath := filepath.Join(os.TempDir(), t.Name(), "hooks") + if err := errors.Join(os.MkdirAll(buildPath, 0o777), os.MkdirAll(hooksPath, 0o777)); err != nil { + t.Fatalf("Couldn't create directories: %v", err) + } + t.Cleanup(func() { + os.RemoveAll(filepath.Join(os.TempDir(), t.Name())) //nolint:errcheck // Best-effort cleanup + }) + + server := NewFakeAPIServer(WithStreaming) + defer server.Close() + + job := server.AddJob(map[string]string{ + "BUILDKITE_COMMAND": "echo echo", + }) + + // Pre-register the agent. + const agentSessionToken = "alpacas" + agent := server.AddAgent(agentSessionToken) + agent.PingHandler = func(*http.Request) (api.Ping, error) { + return api.Ping{}, errors.New("too many pings") + } + go func() { + agent.PingStream <- &agentedgev1.StreamPingsResponse{ + Action: &agentedgev1.StreamPingsResponse_Pause{}, + } + agent.PingStream <- &agentedgev1.StreamPingsResponse{ + Action: &agentedgev1.StreamPingsResponse_Resume{}, + } + close(agent.PingStream) + }() + + l := logger.NewConsoleLogger(logger.NewTestPrinter(t), func(int) {}) + + worker := NewAgentWorker( + l, + &api.AgentRegisterResponse{ + UUID: uuid.New().String(), + Name: "agent-1", + AccessToken: agentSessionToken, + Endpoint: server.URL, + PingInterval: 1, + JobStatusInterval: 1, + HeartbeatInterval: 10, + }, + metrics.NewCollector(logger.Discard, metrics.CollectorConfig{}), + api.NewClient(logger.Discard, api.Config{ + Endpoint: server.URL, + Token: "llamas", + }), + AgentWorkerConfig{ + SpawnIndex: 1, + AgentConfiguration: AgentConfiguration{ + BootstrapScript: dummyBootstrap, + BuildPath: buildPath, + HooksPath: hooksPath, + AcquireJob: job.Job.ID, + }, + }, + ) + worker.noWaitBetweenPingsForTesting = true + + if err := worker.Start(t.Context(), nil); err != nil { + t.Errorf("worker.Start() = %v", err) + } + + if got, want := agent.Pings, 0; got != want { + t.Errorf("agent.Pings = %d, want %d", got, want) + } + if got, want := job.State, JobStateFinished; got != want { + t.Errorf("job.State = %q, want %q", got, want) + } +} + +func TestAgentWorker_Streaming_DisconnectAfterJob_Start_Pause_Unpause(t *testing.T) { + t.Parallel() + + buildPath := filepath.Join(os.TempDir(), t.Name(), "build") + hooksPath := filepath.Join(os.TempDir(), t.Name(), "hooks") + if err := errors.Join(os.MkdirAll(buildPath, 0o777), os.MkdirAll(hooksPath, 0o777)); err != nil { + t.Fatalf("Couldn't create directories: %v", err) + } + t.Cleanup(func() { + os.RemoveAll(filepath.Join(os.TempDir(), t.Name())) //nolint:errcheck // Best-effort cleanup + }) + + server := NewFakeAPIServer(WithStreaming) + defer server.Close() + + job := server.AddJob(map[string]string{ + "BUILDKITE_COMMAND": "echo echo", + }) + + // Pre-register the agent. + const agentSessionToken = "alpacas" + agent := server.AddAgent(agentSessionToken) + agent.PingHandler = func(*http.Request) (api.Ping, error) { + return api.Ping{}, errors.New("too many pings") + } + go func() { + agent.PingStream <- &agentedgev1.StreamPingsResponse{ + Action: &agentedgev1.StreamPingsResponse_JobAssigned{ + JobAssigned: &agentedgev1.JobAssignedAction{ + Job: &agentedgev1.Job{ + Id: job.Job.ID, + }, + }, + }, + } + agent.PingStream <- &agentedgev1.StreamPingsResponse{ + Action: &agentedgev1.StreamPingsResponse_Pause{}, + } + agent.PingStream <- &agentedgev1.StreamPingsResponse{ + Action: &agentedgev1.StreamPingsResponse_Resume{}, + } + close(agent.PingStream) + }() + + server.Assign(agent, job) + + l := logger.NewConsoleLogger(logger.NewTestPrinter(t), func(int) {}) + + worker := NewAgentWorker( + l, + &api.AgentRegisterResponse{ + UUID: uuid.New().String(), + Name: "agent-1", + AccessToken: "alpacas", + Endpoint: server.URL, + PingInterval: 1, + JobStatusInterval: 1, + HeartbeatInterval: 10, + }, + metrics.NewCollector(logger.Discard, metrics.CollectorConfig{}), + api.NewClient(logger.Discard, api.Config{ + Endpoint: server.URL, + Token: "llamas", + }), + AgentWorkerConfig{ + SpawnIndex: 1, + AgentConfiguration: AgentConfiguration{ + BootstrapScript: dummyBootstrap, + BuildPath: buildPath, + HooksPath: hooksPath, + DisconnectAfterJob: true, + }, + }, + ) + worker.noWaitBetweenPingsForTesting = true + + if err := worker.Start(t.Context(), nil); err != nil { + t.Errorf("worker.Start() = %v", err) + } + + if got, want := agent.Pings, 0; got != want { + t.Errorf("agent.Pings = %d, want %d", got, want) + } + if got, want := agent.IgnoreInDispatches, true; got != want { + t.Errorf("agent.IgnoreInDispatches = %t, want %t", got, want) + } + if got, want := job.State, JobStateFinished; got != want { + t.Errorf("job.State = %q, want %q", got, want) + } +} + +func TestAgentWorker_Streaming_UnrecoverableError_Fallback(t *testing.T) { + t.Parallel() + + const agentSessionToken = "alpacas" + + server := NewFakeAPIServer(WithStreaming) + defer server.Close() + + agent := server.AddAgent(agentSessionToken) + agent.PingHandler = func(req *http.Request) (api.Ping, error) { + switch agent.Pings { + case 0: + return api.Ping{Action: "disconnect"}, nil + default: + return api.Ping{}, fmt.Errorf("unexpected ping #%d", agent.Pings) + } + } + agent.PingStreamHandler = func(ctx context.Context, r *connect.Request[agentedgev1.StreamPingsRequest], ss *connect.ServerStream[agentedgev1.StreamPingsResponse]) error { + return connect.NewError(connect.CodePermissionDenied, errors.New("flagrant system error")) + } + + l := logger.NewConsoleLogger(logger.NewTestPrinter(t), func(int) {}) + + worker := NewAgentWorker( + l, + &api.AgentRegisterResponse{ + UUID: uuid.New().String(), + Name: "agent-1", + AccessToken: agentSessionToken, + Endpoint: server.URL, + PingInterval: 1, + JobStatusInterval: 5, + HeartbeatInterval: 60, + }, + metrics.NewCollector(logger.Discard, metrics.CollectorConfig{}), + api.NewClient(logger.Discard, api.Config{ + Endpoint: server.URL, + Token: "llamas", + }), + AgentWorkerConfig{}, + ) + worker.noWaitBetweenPingsForTesting = true + + if err := worker.Start(t.Context(), nil); err != nil { + t.Errorf("worker.Start() = %v, want nil", err) + } + + if got, want := agent.Pings, 1; got != want { + t.Errorf("agent.Pings = %d, want %d", got, want) + } +} + +func TestAgentWorker_Streaming_RecoverableError_Fallback_Resume(t *testing.T) { + t.Parallel() + + const agentSessionToken = "alpacas" + + server := NewFakeAPIServer(WithStreaming) + defer server.Close() + + agent := server.AddAgent(agentSessionToken) + // Default ping handler - idle + + connections := 0 + agent.PingStreamHandler = func(ctx context.Context, req *connect.Request[agentedgev1.StreamPingsRequest], resp *connect.ServerStream[agentedgev1.StreamPingsResponse]) error { + connections++ + switch connections { + case 1: + return connect.NewError(connect.CodeUnavailable, errors.New("demure system error")) + case 2: + return resp.Send(&agentedgev1.StreamPingsResponse{ + Action: &agentedgev1.StreamPingsResponse_Disconnect{}, + }) + default: + return connect.NewError(connect.CodeInternal, errors.New("too many connections")) + } + } + + l := logger.NewConsoleLogger(logger.NewTestPrinter(t), func(int) {}) + + worker := NewAgentWorker( + l, + &api.AgentRegisterResponse{ + UUID: uuid.New().String(), + Name: "agent-1", + AccessToken: agentSessionToken, + Endpoint: server.URL, + PingInterval: 1, + JobStatusInterval: 5, + HeartbeatInterval: 60, + }, + metrics.NewCollector(logger.Discard, metrics.CollectorConfig{}), + api.NewClient(logger.Discard, api.Config{ + Endpoint: server.URL, + Token: "llamas", + }), + AgentWorkerConfig{}, + ) + worker.noWaitBetweenPingsForTesting = true + + if err := worker.Start(t.Context(), nil); err != nil { + t.Errorf("worker.Start() = %v, want nil", err) + } + + if connections != 2 { + t.Errorf("StreamPings connections = %d, want %d", connections, 2) + } +} diff --git a/agent/baton.go b/agent/baton.go new file mode 100644 index 0000000000..0603935a75 --- /dev/null +++ b/agent/baton.go @@ -0,0 +1,86 @@ +package agent + +import "sync" + +// baton is a channel-based mutex. This allows for using it as part of a select +// statement. +type baton struct { + mu sync.Mutex + holder string + acquire map[string]chan struct{} +} + +// newBaton creates a new baton for sharing among actors, each identified +// by a non-empty string. +func newBaton() *baton { + return &baton{ + acquire: make(map[string]chan struct{}), + } +} + +// HeldBy reports if the actor specified by the argument holds the baton. +func (b *baton) HeldBy(by string) bool { + b.mu.Lock() + defer b.mu.Unlock() + return b.holder == by +} + +// Acquire returns a channel that receives when the baton is acquired by the +// acquirer. +func (b *baton) Acquire(by string) <-chan struct{} { + b.mu.Lock() + defer b.mu.Unlock() + + // If there's an existing channel for this actor, reuse it. + ch := b.acquire[by] + if ch == nil { + ch = make(chan struct{}) + } + + // If nothing holds the baton currently, assign it to the caller. + // The caller won't be receiving on the channel until after we + // return it, so make the channel receivable by closing it. + if b.holder == "" { + b.holder = by + close(ch) + delete(b.acquire, by) // in case it is in the map + return ch + } + + // Something holds the baton, so record that this actor is + // waiting for the baton. + b.acquire[by] = ch + return ch +} + +// Release releases the baton, if it is held by the argument. +func (b *baton) Release(by string) { + b.mu.Lock() + defer b.mu.Unlock() + + // Only release if its the same actor, to prevent bugs due + // to double-releasing. + if b.holder != by { + return + } + + // Attempt to pass the baton to anything still waiting for it. + for a, ch := range b.acquire { + delete(b.acquire, a) + select { + case ch <- struct{}{}: + // We were able to send a value to the channel, + // so this actor was still waiting to receive. + // Therefore this actor has acquired the baton. + b.holder = a + return + default: + // This actor has stopped waiting to receive, + // so try another. + } + } + + // Nothing was still waiting on its channel, + // so now nothing holds the baton. + b.holder = "" +} diff --git a/agent/fake_api_server_test.go b/agent/fake_api_server_test.go index 295cd070cb..29e574d4ce 100644 --- a/agent/fake_api_server_test.go +++ b/agent/fake_api_server_test.go @@ -1,6 +1,7 @@ package agent import ( + "context" "encoding/json" "fmt" "io" @@ -10,7 +11,10 @@ import ( "sync" "time" + "connectrpc.com/connect" "github.com/buildkite/agent/v3/api" + agentedgev1 "github.com/buildkite/agent/v3/api/proto/gen" + "github.com/buildkite/agent/v3/api/proto/gen/agentedgev1connect" "github.com/google/uuid" ) @@ -46,6 +50,14 @@ type FakeAgent struct { IgnoreInDispatches bool PingHandler func(*http.Request) (api.Ping, error) + + // PingStream is a simple way of providing streaming responses concurrently. + // It is used for the default handler. + PingStream chan *agentedgev1.StreamPingsResponse + + // PingStreamHandler provides more flexibility in how the streaming request + // is handled. Setting PingStreamHandler overrides the default handler. + PingStreamHandler func(context.Context, *connect.Request[agentedgev1.StreamPingsRequest], *connect.ServerStream[agentedgev1.StreamPingsResponse]) error } // agentJob is just an agent/job tuple. @@ -54,6 +66,8 @@ type agentJob struct { job *FakeJob } +type fakeAPIServerOption = func(*FakeAPIServer, *http.ServeMux) + // FakeAPIServer implements a fake Agent REST API server for testing. type FakeAPIServer struct { *httptest.Server @@ -68,7 +82,7 @@ type FakeAPIServer struct { } // NewFakeAPIServer constructs a new FakeAPIServer for testing. -func NewFakeAPIServer() *FakeAPIServer { +func NewFakeAPIServer(opts ...fakeAPIServerOption) *FakeAPIServer { fs := &FakeAPIServer{ agents: make(map[string]*FakeAgent), jobs: make(map[string]*FakeJob), @@ -76,6 +90,9 @@ func NewFakeAPIServer() *FakeAPIServer { registrations: make(map[string]*api.AgentRegisterResponse), } mux := http.NewServeMux() + for _, opt := range opts { + opt(fs, mux) + } mux.HandleFunc("PUT /jobs/{job_uuid}/acquire", fs.handleJobAcquire) mux.HandleFunc("PUT /jobs/{job_uuid}/accept", fs.handleJobAccept) mux.HandleFunc("PUT /jobs/{job_uuid}/start", fs.handleJobStart) @@ -88,10 +105,36 @@ func NewFakeAPIServer() *FakeAPIServer { return fs } +// WithStreaming enables the ping streaming API for the fake server. +func WithStreaming(fs *FakeAPIServer, mux *http.ServeMux) { + mux.Handle(agentedgev1connect.NewAgentEdgeServiceHandler(fs)) +} + +func (fs *FakeAPIServer) StreamPings(ctx context.Context, req *connect.Request[agentedgev1.StreamPingsRequest], resp *connect.ServerStream[agentedgev1.StreamPingsResponse]) error { + auth := req.Header().Get("Authorization") + agent := fs.agentForAuth(auth) + if agent == nil { + return connect.NewError(connect.CodePermissionDenied, fmt.Errorf("invalid Authorization header value %q", auth)) + } + + if agent.PingStreamHandler != nil { + return agent.PingStreamHandler(ctx, req, resp) + } + + for p := range agent.PingStream { + if err := resp.Send(p); err != nil { + return connect.NewError(connect.CodeUnknown, err) + } + } + return nil +} + func (fs *FakeAPIServer) AddAgent(token string) *FakeAgent { fs.agentsMu.Lock() defer fs.agentsMu.Unlock() - a := &FakeAgent{} + a := &FakeAgent{ + PingStream: make(chan *agentedgev1.StreamPingsResponse), + } fs.agents["Token "+token] = a return a } diff --git a/agent/idle_monitor.go b/agent/idle_monitor.go index 1f3743c676..75c3288c39 100644 --- a/agent/idle_monitor.go +++ b/agent/idle_monitor.go @@ -1,7 +1,7 @@ package agent import ( - "sync" + "context" "time" ) @@ -20,69 +20,146 @@ import ( // -> Idle -- */ type idleMonitor struct { - mu sync.Mutex - exiting bool + // exiting is closed when the idle monitor says all agents should exit + exiting chan struct{} + + // totalAgents is the total number of agents configured to run totalAgents int - idleAt map[*AgentWorker]time.Time + + // idleTimeout is a copy of the DisconnectAfterIdleTimeout value + idleTimeout time.Duration + + // Channels used to update the monitor state + becameIdle chan *AgentWorker + becameBusy chan *AgentWorker + becameDead chan *AgentWorker + + // idleAt tracks when each agent became idle/dead. + // Agents not present in the map are busy. + idleAt map[*AgentWorker]time.Time } -// newIdleMonitor creates a new IdleMonitor. -func newIdleMonitor(totalAgents int) *idleMonitor { - return &idleMonitor{ +// NewIdleMonitor creates a new IdleMonitor. +func NewIdleMonitor(ctx context.Context, totalAgents int, idleTimeout time.Duration) *idleMonitor { + if idleTimeout <= 0 { + // Note that the methods handle a nil receiver safely. + return nil + } + i := &idleMonitor{ + exiting: make(chan struct{}), totalAgents: totalAgents, + idleTimeout: idleTimeout, + becameIdle: make(chan *AgentWorker), + becameBusy: make(chan *AgentWorker), + becameDead: make(chan *AgentWorker), idleAt: make(map[*AgentWorker]time.Time), } + go i.monitor(ctx) + return i } -// shouldExit reports whether all agents are dead or have been idle for at least -// minIdle. If shouldExit returns true, it will return true on all subsequent -// calls. -func (i *idleMonitor) shouldExit(minIdle time.Duration) bool { - i.mu.Lock() - defer i.mu.Unlock() - - // Once the idle monitor decides we're exiting, we're exiting. - if i.exiting { - return true +// monitor is the internal goroutine for handling idleness. +func (i *idleMonitor) monitor(ctx context.Context) { + if i == nil { + return } - // Are all alive agents dead or idle for long enough? - idle := 0 - for _, t := range i.idleAt { - if !t.IsZero() && time.Since(t) < minIdle { - return false + // Once the idle monitor returns, all the agents should also exit. + defer close(i.exiting) + + var lastTimeout <-chan time.Time + for { + select { + case <-ctx.Done(): + return + + case <-lastTimeout: + return + + case agent := <-i.becameIdle: + // Idleness is counted from when the agent first became idle. + if _, alreadyIdle := i.idleAt[agent]; alreadyIdle { + break + } + i.idleAt[agent] = time.Now() + + case agent := <-i.becameBusy: + delete(i.idleAt, agent) + + case agent := <-i.becameDead: + i.idleAt[agent] = time.Time{} + } + + // Update the timeout channel based on all the agent states + // Are there any busy agents? Then don't time out. + if len(i.idleAt) < i.totalAgents { + lastTimeout = nil + continue + } + + // They're all idle or dead. Figure out when the timeout should happen. + // If they're all dead, then the timeout happens immediately. + // If at least one is idle and _not_ dead, then the timeout happens + // however much of idleTimeout remains since the agent that most + // recently became idle. + var timeout time.Duration + for _, t := range i.idleAt { + if t.IsZero() { + continue + } + timeout = max(timeout, i.idleTimeout-time.Since(t)) + } + if timeout == 0 { + return } - idle++ + lastTimeout = time.After(timeout) } - if idle < i.totalAgents { - return false +} + +// Exiting returns a channel that is closed when the monitor declares +// all agents should exit. It is safe to use with a nil pointer. +func (i *idleMonitor) Exiting() <-chan struct{} { + if i == nil { + return nil } - i.exiting = true - return true + return i.exiting } -// markIdle marks an agent as idle. -func (i *idleMonitor) markIdle(agent *AgentWorker) { - i.mu.Lock() - defer i.mu.Unlock() - // Allow MarkIdle to be called multiple times without updating the idleAt - // timestamp. - if _, alreadyIdle := i.idleAt[agent]; alreadyIdle { +// MarkIdle marks an agent as idle. It is safe to use with a nil pointer. +func (i *idleMonitor) MarkIdle(agent *AgentWorker) { + if i == nil { return } - i.idleAt[agent] = time.Now() + select { + case i.becameIdle <- agent: + // marked as idle + case <-i.exiting: + // no goroutine listening on i.becameIdle + } } -// markDead marks an agent as dead. -func (i *idleMonitor) markDead(agent *AgentWorker) { - i.mu.Lock() - defer i.mu.Unlock() - i.idleAt[agent] = time.Time{} +// MarkDead marks an agent as dead. It is safe to use with a nil pointer. +func (i *idleMonitor) MarkDead(agent *AgentWorker) { + if i == nil { + return + } + select { + case i.becameDead <- agent: + // marked as dead + case <-i.exiting: + // no goroutine listening on i.becameDead + } } -// markBusy marks an agent as busy. -func (i *idleMonitor) markBusy(agent *AgentWorker) { - i.mu.Lock() - defer i.mu.Unlock() - delete(i.idleAt, agent) +// MarkBusy marks an agent as busy. It is safe to use with a nil pointer. +func (i *idleMonitor) MarkBusy(agent *AgentWorker) { + if i == nil { + return + } + select { + case i.becameBusy <- agent: + // marked as busy + case <-i.exiting: + // no goroutine listening on i.becameBusy + } } diff --git a/agent/idle_monitor_test.go b/agent/idle_monitor_test.go new file mode 100644 index 0000000000..db340e98b7 --- /dev/null +++ b/agent/idle_monitor_test.go @@ -0,0 +1,91 @@ +package agent + +import ( + "testing" + "time" +) + +func TestIdleMonitor(t *testing.T) { + t.Parallel() + + idleTimeout := 100 * time.Millisecond + i := NewIdleMonitor(t.Context(), 3, idleTimeout) + + // These "agents" don't actually run, they're just 3 different pointers. + agents := []*AgentWorker{ + new(AgentWorker), new(AgentWorker), new(AgentWorker), + } + + i.MarkBusy(agents[0]) + i.MarkIdle(agents[1]) + i.MarkDead(agents[2]) + + // The idle monitor should start exiting within 1 second of the agents all + // being idle or dead. + start := time.Now() + i.MarkIdle(agents[0]) + select { + case <-i.Exiting(): + // This case should win, but only after the timeout. + if exitedAfter := time.Since(start); exitedAfter < idleTimeout { + t.Errorf("exitedAfter = %v, want > %v", exitedAfter, idleTimeout) + } + + case <-time.After(2 * idleTimeout): + // TODO: use testing/synctest when that becomes available + t.Error("timed out waiting on <-i.Exiting()") + } +} + +func TestIdleMonitor_AllDead(t *testing.T) { + t.Parallel() + + idleTimeout := 100 * time.Millisecond + i := NewIdleMonitor(t.Context(), 3, idleTimeout) + + agents := []*AgentWorker{ + new(AgentWorker), new(AgentWorker), new(AgentWorker), + } + + // All agents dead should result in exiting instantly. + i.MarkDead(agents[0]) + i.MarkDead(agents[1]) + + start := time.Now() + i.MarkDead(agents[2]) + + select { + case <-i.Exiting(): + // This case should win, quickly. + if exitedAfter := time.Since(start); exitedAfter > idleTimeout { + t.Errorf("exitedAfter = %v, want < %v", exitedAfter, idleTimeout) + } + case <-time.After(idleTimeout): + // TODO: use testing/synctest when that becomes available + t.Error("timed out waiting on <-i.Exiting()") + } +} + +func TestIdleMonitor_Busy(t *testing.T) { + t.Parallel() + + idleTimeout := 100 * time.Millisecond + i := NewIdleMonitor(t.Context(), 3, idleTimeout) + + agents := []*AgentWorker{ + new(AgentWorker), new(AgentWorker), new(AgentWorker), + } + + // Any agent still busy should not cause an exit. + i.MarkDead(agents[0]) + i.MarkDead(agents[1]) + i.MarkBusy(agents[2]) + + select { + case <-i.Exiting(): + t.Error("<-i.Exiting() happened while at least one agent was still busy") + + case <-time.After(2 * idleTimeout): + // This case should win. + } +} diff --git a/api/jobs.go b/api/jobs.go index 7eb0986ae9..17f1a93daa 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -85,8 +85,8 @@ func (c *Client) AcquireJob(ctx context.Context, id string, headers ...Header) ( // AcceptJob accepts the passed in job. Returns the job with its finalized set of // environment variables (when a job is accepted, the agents environment is // applied to the job) -func (c *Client) AcceptJob(ctx context.Context, job *Job) (*Job, *Response, error) { - u := fmt.Sprintf("jobs/%s/accept", railsPathEscape(job.ID)) +func (c *Client) AcceptJob(ctx context.Context, jobID string) (*Job, *Response, error) { + u := fmt.Sprintf("jobs/%s/accept", railsPathEscape(jobID)) req, err := c.newRequest(ctx, "PUT", u, nil) if err != nil { diff --git a/api/pings_streaming.go b/api/pings_streaming.go new file mode 100644 index 0000000000..f591680a30 --- /dev/null +++ b/api/pings_streaming.go @@ -0,0 +1,72 @@ +package api + +import ( + "context" + "fmt" + "iter" + "net/url" + + "connectrpc.com/connect" + agentedgev1 "github.com/buildkite/agent/v3/api/proto/gen" + "github.com/buildkite/agent/v3/api/proto/gen/agentedgev1connect" +) + +// StreamPings opens a ConnectRPC channel for streaming pings. It returns an +// iterator over received messages and any error that occurs. +func (c *Client) StreamPings(ctx context.Context, agentID string, opts ...connect.ClientOption) (iter.Seq2[*agentedgev1.StreamPingsResponse, error], error) { + // The streaming endpoint is the same as the main endpoint, + // minus the `/v3/`. + u, err := url.Parse(c.conf.Endpoint) + if err != nil { + return nil, fmt.Errorf("parsing endpoint: %w", err) + } + u.Path = "/" + + cl := agentedgev1connect.NewAgentEdgeServiceClient( + c.client, + u.String(), + connect.WithGRPC(), + connect.WithClientOptions(opts...), + ) + + // In order to set request headers, we need to tweak a value set in the + // context. To me, this feels too much like burying optional parameters + // in a context, which I think is bad - https://pkg.go.dev/context says: + // "Use context Values only for request-scoped data that transits processes + // and APIs, not for passing optional parameters to functions." + ctx, callInfo := connect.NewClientContext(ctx) + h := callInfo.RequestHeader() + + // Add any request headers specified by the server during register/ping + for k, values := range c.requestHeaders { + for _, v := range values { + h.Add(k, v) + } + } + + // The Authorization header is added by the custom transport. + // Other methods add User-Agent in newRequest. + // Note that this does not set the entire header. + // ConnectRPC takes our value here and adds its own component *before* our + // own, which violates the convention of decreasing importance + // (see RFC 7231 section 5.5.3). + h.Set("User-Agent", c.conf.UserAgent) + stream, err := cl.StreamPings(ctx, connect.NewRequest(&agentedgev1.StreamPingsRequest{ + AgentId: agentID, + })) + if err != nil { + return nil, fmt.Errorf("from StreamPings: %w", err) + } + + return func(yield func(*agentedgev1.StreamPingsResponse, error) bool) { + defer stream.Close() //nolint:errcheck // Best-effort cleanup + for stream.Receive() { + if !yield(stream.Msg(), nil) { + return + } + } + if err := stream.Err(); err != nil { + yield(nil, err) + } + }, nil +} diff --git a/api/proto/agentedge.proto b/api/proto/agentedge.proto new file mode 100644 index 0000000000..4e2645ab11 --- /dev/null +++ b/api/proto/agentedge.proto @@ -0,0 +1,41 @@ +syntax = "proto3"; + +package agentedge.v1; + +import "buf/validate/validate.proto"; + +message StreamPingsRequest { + string agent_id = 1; +} + +message StreamPingsResponse { + + oneof action { + ResumeAction resume = 2; + PauseAction pause = 3; + DisconnectAction disconnect = 4; + JobAssignedAction job_assigned = 5; + } +} + +message ResumeAction {} + +message PauseAction { + string reason = 1; +} + +message DisconnectAction { + string reason = 1; +} + +message JobAssignedAction { + Job job = 1; +} + +message Job { + string id = 1; +} + +service AgentEdgeService { + rpc StreamPings(StreamPingsRequest) returns (stream StreamPingsResponse) {} +} diff --git a/api/proto/buf.gen.yaml b/api/proto/buf.gen.yaml new file mode 100644 index 0000000000..9cb9c34231 --- /dev/null +++ b/api/proto/buf.gen.yaml @@ -0,0 +1,18 @@ +version: v2 +plugins: + - local: protoc-gen-go + out: gen + opt: + - paths=source_relative + - local: protoc-gen-connect-go + out: gen + opt: + - paths=source_relative +managed: + enabled: true + override: + - file_option: go_package_prefix + value: github.com/buildkite/agent/v3/api/proto/gen + disable: + - file_option: go_package + module: buf.build/bufbuild/protovalidate diff --git a/api/proto/buf.lock b/api/proto/buf.lock new file mode 100644 index 0000000000..bb66131a69 --- /dev/null +++ b/api/proto/buf.lock @@ -0,0 +1,6 @@ +# Generated by buf. DO NOT EDIT. +version: v2 +deps: + - name: buf.build/bufbuild/protovalidate + commit: 52f32327d4b045a79293a6ad4e7e1236 + digest: b5:cbabc98d4b7b7b0447c9b15f68eeb8a7a44ef8516cb386ac5f66e7fd4062cd6723ed3f452ad8c384b851f79e33d26e7f8a94e2b807282b3def1cd966c7eace97 diff --git a/api/proto/buf.yaml b/api/proto/buf.yaml new file mode 100644 index 0000000000..5685844978 --- /dev/null +++ b/api/proto/buf.yaml @@ -0,0 +1,10 @@ +# For details on buf.yaml configuration, visit https://buf.build/docs/configuration/v2/buf-yaml +version: v2 +deps: + - buf.build/bufbuild/protovalidate +lint: + use: + - STANDARD +breaking: + use: + - FILE diff --git a/api/proto/gen/agentedge.pb.go b/api/proto/gen/agentedge.pb.go new file mode 100644 index 0000000000..08c33aebac --- /dev/null +++ b/api/proto/gen/agentedge.pb.go @@ -0,0 +1,488 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.10 +// protoc (unknown) +// source: agentedge.proto + +package agentedgev1 + +import ( + _ "buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go/buf/validate" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type StreamPingsRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + AgentId string `protobuf:"bytes,1,opt,name=agent_id,json=agentId,proto3" json:"agent_id,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *StreamPingsRequest) Reset() { + *x = StreamPingsRequest{} + mi := &file_agentedge_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StreamPingsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamPingsRequest) ProtoMessage() {} + +func (x *StreamPingsRequest) ProtoReflect() protoreflect.Message { + mi := &file_agentedge_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamPingsRequest.ProtoReflect.Descriptor instead. +func (*StreamPingsRequest) Descriptor() ([]byte, []int) { + return file_agentedge_proto_rawDescGZIP(), []int{0} +} + +func (x *StreamPingsRequest) GetAgentId() string { + if x != nil { + return x.AgentId + } + return "" +} + +type StreamPingsResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Types that are valid to be assigned to Action: + // + // *StreamPingsResponse_Resume + // *StreamPingsResponse_Pause + // *StreamPingsResponse_Disconnect + // *StreamPingsResponse_JobAssigned + Action isStreamPingsResponse_Action `protobuf_oneof:"action"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *StreamPingsResponse) Reset() { + *x = StreamPingsResponse{} + mi := &file_agentedge_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StreamPingsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamPingsResponse) ProtoMessage() {} + +func (x *StreamPingsResponse) ProtoReflect() protoreflect.Message { + mi := &file_agentedge_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamPingsResponse.ProtoReflect.Descriptor instead. +func (*StreamPingsResponse) Descriptor() ([]byte, []int) { + return file_agentedge_proto_rawDescGZIP(), []int{1} +} + +func (x *StreamPingsResponse) GetAction() isStreamPingsResponse_Action { + if x != nil { + return x.Action + } + return nil +} + +func (x *StreamPingsResponse) GetResume() *ResumeAction { + if x != nil { + if x, ok := x.Action.(*StreamPingsResponse_Resume); ok { + return x.Resume + } + } + return nil +} + +func (x *StreamPingsResponse) GetPause() *PauseAction { + if x != nil { + if x, ok := x.Action.(*StreamPingsResponse_Pause); ok { + return x.Pause + } + } + return nil +} + +func (x *StreamPingsResponse) GetDisconnect() *DisconnectAction { + if x != nil { + if x, ok := x.Action.(*StreamPingsResponse_Disconnect); ok { + return x.Disconnect + } + } + return nil +} + +func (x *StreamPingsResponse) GetJobAssigned() *JobAssignedAction { + if x != nil { + if x, ok := x.Action.(*StreamPingsResponse_JobAssigned); ok { + return x.JobAssigned + } + } + return nil +} + +type isStreamPingsResponse_Action interface { + isStreamPingsResponse_Action() +} + +type StreamPingsResponse_Resume struct { + Resume *ResumeAction `protobuf:"bytes,2,opt,name=resume,proto3,oneof"` +} + +type StreamPingsResponse_Pause struct { + Pause *PauseAction `protobuf:"bytes,3,opt,name=pause,proto3,oneof"` +} + +type StreamPingsResponse_Disconnect struct { + Disconnect *DisconnectAction `protobuf:"bytes,4,opt,name=disconnect,proto3,oneof"` +} + +type StreamPingsResponse_JobAssigned struct { + JobAssigned *JobAssignedAction `protobuf:"bytes,5,opt,name=job_assigned,json=jobAssigned,proto3,oneof"` +} + +func (*StreamPingsResponse_Resume) isStreamPingsResponse_Action() {} + +func (*StreamPingsResponse_Pause) isStreamPingsResponse_Action() {} + +func (*StreamPingsResponse_Disconnect) isStreamPingsResponse_Action() {} + +func (*StreamPingsResponse_JobAssigned) isStreamPingsResponse_Action() {} + +type ResumeAction struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ResumeAction) Reset() { + *x = ResumeAction{} + mi := &file_agentedge_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ResumeAction) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ResumeAction) ProtoMessage() {} + +func (x *ResumeAction) ProtoReflect() protoreflect.Message { + mi := &file_agentedge_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ResumeAction.ProtoReflect.Descriptor instead. +func (*ResumeAction) Descriptor() ([]byte, []int) { + return file_agentedge_proto_rawDescGZIP(), []int{2} +} + +type PauseAction struct { + state protoimpl.MessageState `protogen:"open.v1"` + Reason string `protobuf:"bytes,1,opt,name=reason,proto3" json:"reason,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PauseAction) Reset() { + *x = PauseAction{} + mi := &file_agentedge_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PauseAction) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PauseAction) ProtoMessage() {} + +func (x *PauseAction) ProtoReflect() protoreflect.Message { + mi := &file_agentedge_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PauseAction.ProtoReflect.Descriptor instead. +func (*PauseAction) Descriptor() ([]byte, []int) { + return file_agentedge_proto_rawDescGZIP(), []int{3} +} + +func (x *PauseAction) GetReason() string { + if x != nil { + return x.Reason + } + return "" +} + +type DisconnectAction struct { + state protoimpl.MessageState `protogen:"open.v1"` + Reason string `protobuf:"bytes,1,opt,name=reason,proto3" json:"reason,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *DisconnectAction) Reset() { + *x = DisconnectAction{} + mi := &file_agentedge_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *DisconnectAction) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DisconnectAction) ProtoMessage() {} + +func (x *DisconnectAction) ProtoReflect() protoreflect.Message { + mi := &file_agentedge_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DisconnectAction.ProtoReflect.Descriptor instead. +func (*DisconnectAction) Descriptor() ([]byte, []int) { + return file_agentedge_proto_rawDescGZIP(), []int{4} +} + +func (x *DisconnectAction) GetReason() string { + if x != nil { + return x.Reason + } + return "" +} + +type JobAssignedAction struct { + state protoimpl.MessageState `protogen:"open.v1"` + Job *Job `protobuf:"bytes,1,opt,name=job,proto3" json:"job,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *JobAssignedAction) Reset() { + *x = JobAssignedAction{} + mi := &file_agentedge_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *JobAssignedAction) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*JobAssignedAction) ProtoMessage() {} + +func (x *JobAssignedAction) ProtoReflect() protoreflect.Message { + mi := &file_agentedge_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use JobAssignedAction.ProtoReflect.Descriptor instead. +func (*JobAssignedAction) Descriptor() ([]byte, []int) { + return file_agentedge_proto_rawDescGZIP(), []int{5} +} + +func (x *JobAssignedAction) GetJob() *Job { + if x != nil { + return x.Job + } + return nil +} + +type Job struct { + state protoimpl.MessageState `protogen:"open.v1"` + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Job) Reset() { + *x = Job{} + mi := &file_agentedge_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Job) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Job) ProtoMessage() {} + +func (x *Job) ProtoReflect() protoreflect.Message { + mi := &file_agentedge_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Job.ProtoReflect.Descriptor instead. +func (*Job) Descriptor() ([]byte, []int) { + return file_agentedge_proto_rawDescGZIP(), []int{6} +} + +func (x *Job) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +var File_agentedge_proto protoreflect.FileDescriptor + +const file_agentedge_proto_rawDesc = "" + + "\n" + + "\x0fagentedge.proto\x12\fagentedge.v1\x1a\x1bbuf/validate/validate.proto\"/\n" + + "\x12StreamPingsRequest\x12\x19\n" + + "\bagent_id\x18\x01 \x01(\tR\aagentId\"\x90\x02\n" + + "\x13StreamPingsResponse\x124\n" + + "\x06resume\x18\x02 \x01(\v2\x1a.agentedge.v1.ResumeActionH\x00R\x06resume\x121\n" + + "\x05pause\x18\x03 \x01(\v2\x19.agentedge.v1.PauseActionH\x00R\x05pause\x12@\n" + + "\n" + + "disconnect\x18\x04 \x01(\v2\x1e.agentedge.v1.DisconnectActionH\x00R\n" + + "disconnect\x12D\n" + + "\fjob_assigned\x18\x05 \x01(\v2\x1f.agentedge.v1.JobAssignedActionH\x00R\vjobAssignedB\b\n" + + "\x06action\"\x0e\n" + + "\fResumeAction\"%\n" + + "\vPauseAction\x12\x16\n" + + "\x06reason\x18\x01 \x01(\tR\x06reason\"*\n" + + "\x10DisconnectAction\x12\x16\n" + + "\x06reason\x18\x01 \x01(\tR\x06reason\"8\n" + + "\x11JobAssignedAction\x12#\n" + + "\x03job\x18\x01 \x01(\v2\x11.agentedge.v1.JobR\x03job\"\x15\n" + + "\x03Job\x12\x0e\n" + + "\x02id\x18\x01 \x01(\tR\x02id2j\n" + + "\x10AgentEdgeService\x12V\n" + + "\vStreamPings\x12 .agentedge.v1.StreamPingsRequest\x1a!.agentedge.v1.StreamPingsResponse\"\x000\x01B\xac\x01\n" + + "\x10com.agentedge.v1B\x0eAgentedgeProtoP\x01Z7github.com/buildkite/agent/v3/api/proto/gen;agentedgev1\xa2\x02\x03AXX\xaa\x02\fAgentedge.V1\xca\x02\fAgentedge\\V1\xe2\x02\x18Agentedge\\V1\\GPBMetadata\xea\x02\rAgentedge::V1b\x06proto3" + +var ( + file_agentedge_proto_rawDescOnce sync.Once + file_agentedge_proto_rawDescData []byte +) + +func file_agentedge_proto_rawDescGZIP() []byte { + file_agentedge_proto_rawDescOnce.Do(func() { + file_agentedge_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_agentedge_proto_rawDesc), len(file_agentedge_proto_rawDesc))) + }) + return file_agentedge_proto_rawDescData +} + +var file_agentedge_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_agentedge_proto_goTypes = []any{ + (*StreamPingsRequest)(nil), // 0: agentedge.v1.StreamPingsRequest + (*StreamPingsResponse)(nil), // 1: agentedge.v1.StreamPingsResponse + (*ResumeAction)(nil), // 2: agentedge.v1.ResumeAction + (*PauseAction)(nil), // 3: agentedge.v1.PauseAction + (*DisconnectAction)(nil), // 4: agentedge.v1.DisconnectAction + (*JobAssignedAction)(nil), // 5: agentedge.v1.JobAssignedAction + (*Job)(nil), // 6: agentedge.v1.Job +} +var file_agentedge_proto_depIdxs = []int32{ + 2, // 0: agentedge.v1.StreamPingsResponse.resume:type_name -> agentedge.v1.ResumeAction + 3, // 1: agentedge.v1.StreamPingsResponse.pause:type_name -> agentedge.v1.PauseAction + 4, // 2: agentedge.v1.StreamPingsResponse.disconnect:type_name -> agentedge.v1.DisconnectAction + 5, // 3: agentedge.v1.StreamPingsResponse.job_assigned:type_name -> agentedge.v1.JobAssignedAction + 6, // 4: agentedge.v1.JobAssignedAction.job:type_name -> agentedge.v1.Job + 0, // 5: agentedge.v1.AgentEdgeService.StreamPings:input_type -> agentedge.v1.StreamPingsRequest + 1, // 6: agentedge.v1.AgentEdgeService.StreamPings:output_type -> agentedge.v1.StreamPingsResponse + 6, // [6:7] is the sub-list for method output_type + 5, // [5:6] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name +} + +func init() { file_agentedge_proto_init() } +func file_agentedge_proto_init() { + if File_agentedge_proto != nil { + return + } + file_agentedge_proto_msgTypes[1].OneofWrappers = []any{ + (*StreamPingsResponse_Resume)(nil), + (*StreamPingsResponse_Pause)(nil), + (*StreamPingsResponse_Disconnect)(nil), + (*StreamPingsResponse_JobAssigned)(nil), + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_agentedge_proto_rawDesc), len(file_agentedge_proto_rawDesc)), + NumEnums: 0, + NumMessages: 7, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_agentedge_proto_goTypes, + DependencyIndexes: file_agentedge_proto_depIdxs, + MessageInfos: file_agentedge_proto_msgTypes, + }.Build() + File_agentedge_proto = out.File + file_agentedge_proto_goTypes = nil + file_agentedge_proto_depIdxs = nil +} diff --git a/api/proto/gen/agentedgev1connect/agentedge.connect.go b/api/proto/gen/agentedgev1connect/agentedge.connect.go new file mode 100644 index 0000000000..f2738d669f --- /dev/null +++ b/api/proto/gen/agentedgev1connect/agentedge.connect.go @@ -0,0 +1,109 @@ +// Code generated by protoc-gen-connect-go. DO NOT EDIT. +// +// Source: agentedge.proto + +package agentedgev1connect + +import ( + connect "connectrpc.com/connect" + context "context" + errors "errors" + gen "github.com/buildkite/agent/v3/api/proto/gen" + http "net/http" + strings "strings" +) + +// This is a compile-time assertion to ensure that this generated file and the connect package are +// compatible. If you get a compiler error that this constant is not defined, this code was +// generated with a version of connect newer than the one compiled into your binary. You can fix the +// problem by either regenerating this code with an older version of connect or updating the connect +// version compiled into your binary. +const _ = connect.IsAtLeastVersion1_13_0 + +const ( + // AgentEdgeServiceName is the fully-qualified name of the AgentEdgeService service. + AgentEdgeServiceName = "agentedge.v1.AgentEdgeService" +) + +// These constants are the fully-qualified names of the RPCs defined in this package. They're +// exposed at runtime as Spec.Procedure and as the final two segments of the HTTP route. +// +// Note that these are different from the fully-qualified method names used by +// google.golang.org/protobuf/reflect/protoreflect. To convert from these constants to +// reflection-formatted method names, remove the leading slash and convert the remaining slash to a +// period. +const ( + // AgentEdgeServiceStreamPingsProcedure is the fully-qualified name of the AgentEdgeService's + // StreamPings RPC. + AgentEdgeServiceStreamPingsProcedure = "/agentedge.v1.AgentEdgeService/StreamPings" +) + +// AgentEdgeServiceClient is a client for the agentedge.v1.AgentEdgeService service. +type AgentEdgeServiceClient interface { + StreamPings(context.Context, *connect.Request[gen.StreamPingsRequest]) (*connect.ServerStreamForClient[gen.StreamPingsResponse], error) +} + +// NewAgentEdgeServiceClient constructs a client for the agentedge.v1.AgentEdgeService service. By +// default, it uses the Connect protocol with the binary Protobuf Codec, asks for gzipped responses, +// and sends uncompressed requests. To use the gRPC or gRPC-Web protocols, supply the +// connect.WithGRPC() or connect.WithGRPCWeb() options. +// +// The URL supplied here should be the base URL for the Connect or gRPC server (for example, +// http://api.acme.com or https://acme.com/grpc). +func NewAgentEdgeServiceClient(httpClient connect.HTTPClient, baseURL string, opts ...connect.ClientOption) AgentEdgeServiceClient { + baseURL = strings.TrimRight(baseURL, "/") + agentEdgeServiceMethods := gen.File_agentedge_proto.Services().ByName("AgentEdgeService").Methods() + return &agentEdgeServiceClient{ + streamPings: connect.NewClient[gen.StreamPingsRequest, gen.StreamPingsResponse]( + httpClient, + baseURL+AgentEdgeServiceStreamPingsProcedure, + connect.WithSchema(agentEdgeServiceMethods.ByName("StreamPings")), + connect.WithClientOptions(opts...), + ), + } +} + +// agentEdgeServiceClient implements AgentEdgeServiceClient. +type agentEdgeServiceClient struct { + streamPings *connect.Client[gen.StreamPingsRequest, gen.StreamPingsResponse] +} + +// StreamPings calls agentedge.v1.AgentEdgeService.StreamPings. +func (c *agentEdgeServiceClient) StreamPings(ctx context.Context, req *connect.Request[gen.StreamPingsRequest]) (*connect.ServerStreamForClient[gen.StreamPingsResponse], error) { + return c.streamPings.CallServerStream(ctx, req) +} + +// AgentEdgeServiceHandler is an implementation of the agentedge.v1.AgentEdgeService service. +type AgentEdgeServiceHandler interface { + StreamPings(context.Context, *connect.Request[gen.StreamPingsRequest], *connect.ServerStream[gen.StreamPingsResponse]) error +} + +// NewAgentEdgeServiceHandler builds an HTTP handler from the service implementation. It returns the +// path on which to mount the handler and the handler itself. +// +// By default, handlers support the Connect, gRPC, and gRPC-Web protocols with the binary Protobuf +// and JSON codecs. They also support gzip compression. +func NewAgentEdgeServiceHandler(svc AgentEdgeServiceHandler, opts ...connect.HandlerOption) (string, http.Handler) { + agentEdgeServiceMethods := gen.File_agentedge_proto.Services().ByName("AgentEdgeService").Methods() + agentEdgeServiceStreamPingsHandler := connect.NewServerStreamHandler( + AgentEdgeServiceStreamPingsProcedure, + svc.StreamPings, + connect.WithSchema(agentEdgeServiceMethods.ByName("StreamPings")), + connect.WithHandlerOptions(opts...), + ) + return "/agentedge.v1.AgentEdgeService/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case AgentEdgeServiceStreamPingsProcedure: + agentEdgeServiceStreamPingsHandler.ServeHTTP(w, r) + default: + http.NotFound(w, r) + } + }) +} + +// UnimplementedAgentEdgeServiceHandler returns CodeUnimplemented from all methods. +type UnimplementedAgentEdgeServiceHandler struct{} + +func (UnimplementedAgentEdgeServiceHandler) StreamPings(context.Context, *connect.Request[gen.StreamPingsRequest], *connect.ServerStream[gen.StreamPingsResponse]) error { + return connect.NewError(connect.CodeUnimplemented, errors.New("agentedge.v1.AgentEdgeService.StreamPings is not implemented")) +} diff --git a/api/retryable.go b/api/retryable.go index beb5f2d811..4666477aa3 100644 --- a/api/retryable.go +++ b/api/retryable.go @@ -5,7 +5,6 @@ import ( "net" "net/http" "net/url" - "slices" "strings" "syscall" ) @@ -20,17 +19,17 @@ var retrableErrorSuffixes = []string{ io.EOF.Error(), } -var retryableStatuses = []int{ - http.StatusTooManyRequests, // 429 - http.StatusInternalServerError, // 500 - http.StatusBadGateway, // 502 - http.StatusServiceUnavailable, // 503 - http.StatusGatewayTimeout, // 504 +var retryableStatuses = map[int]bool{ + http.StatusTooManyRequests: true, // 429 + http.StatusInternalServerError: true, // 500 + http.StatusBadGateway: true, // 502 + http.StatusServiceUnavailable: true, // 503 + http.StatusGatewayTimeout: true, // 504 } // IsRetryableStatus returns true if the response's StatusCode is one that we should retry. func IsRetryableStatus(r *Response) bool { - return r.StatusCode >= 400 && slices.Contains(retryableStatuses, r.StatusCode) + return retryableStatuses[r.StatusCode] } // Looks at a bunch of connection related errors, and returns true if the error diff --git a/clicommand/agent_start.go b/clicommand/agent_start.go index 8d919ed520..bb3ff2d0f6 100644 --- a/clicommand/agent_start.go +++ b/clicommand/agent_start.go @@ -184,13 +184,15 @@ type AgentStartConfig struct { TraceContextEncoding string `cli:"trace-context-encoding"` NoMultipartArtifactUpload bool `cli:"no-multipart-artifact-upload"` + // API + agent behaviour + PingMode string `cli:"ping-mode"` + // API config DebugHTTP bool `cli:"debug-http"` TraceHTTP bool `cli:"trace-http"` Token string `cli:"token" validate:"required"` Endpoint string `cli:"endpoint" validate:"required"` NoHTTP2 bool `cli:"no-http2"` - // Deprecated KubernetesLogCollectionGracePeriod time.Duration `cli:"kubernetes-log-collection-grace-period"` NoSSHFingerprintVerification bool `cli:"no-automatic-ssh-fingerprint-verification" deprecated-and-renamed-to:"NoSSHKeyscan"` @@ -759,6 +761,14 @@ var AgentStartCommand = cli.Command{ EnvVar: "BUILDKITE_AGENT_DISABLE_WARNINGS_FOR", }, + // API + agent behaviour + cli.StringFlag{ + Name: "ping-mode", + Usage: "Selects available protocols for dispatching work to this agent. One of auto (default), ping-only, stream-only.", + Value: "auto", + EnvVar: "BUILDKITE_AGENT_PING_MODE", + }, + // API Flags AgentRegisterTokenFlag, // != AgentAccessToken EndpointFlag, @@ -1083,6 +1093,7 @@ var AgentStartCommand = cli.Command{ TraceContextEncoding: cfg.TraceContextEncoding, AllowMultipartArtifactUpload: !cfg.NoMultipartArtifactUpload, KubernetesExec: cfg.KubernetesExec, + PingMode: cfg.PingMode, SigningJWKSFile: cfg.SigningJWKSFile, SigningJWKSKeyID: cfg.SigningJWKSKeyID, @@ -1311,7 +1322,7 @@ var AgentStartCommand = cli.Command{ } // Setup the agent pool that spawns agent workers - pool := agent.NewAgentPool(workers) + pool := agent.NewAgentPool(workers, &agentConf) // Agent-wide shutdown hook. Once per agent, for all workers on the agent. defer agentShutdownHook(l, cfg) diff --git a/go.mod b/go.mod index 09a2bfb282..4069266721 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,9 @@ go 1.24.0 toolchain go1.24.5 require ( + buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.11-20251209175733-2a1774d88802.1 cloud.google.com/go/compute/metadata v0.9.0 + connectrpc.com/connect v1.19.1 drjosh.dev/zzglob v0.4.2 github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1 github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.4 @@ -63,6 +65,7 @@ require ( golang.org/x/sys v0.41.0 golang.org/x/term v0.40.0 google.golang.org/api v0.260.0 + google.golang.org/protobuf v1.36.11 gopkg.in/DataDog/dd-trace-go.v1 v1.74.8 gopkg.in/yaml.v3 v3.0.1 gotest.tools/v3 v3.5.2 @@ -206,7 +209,6 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect google.golang.org/grpc v1.78.0 // indirect - google.golang.org/protobuf v1.36.11 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gotest.tools/gotestsum v1.13.0 // indirect diff --git a/go.sum b/go.sum index ce3773acbe..531634e49b 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,13 @@ +buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.11-20251209175733-2a1774d88802.1 h1:j9yeqTWEFrtimt8Nng2MIeRrpoCvQzM9/g25XTvqUGg= +buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.11-20251209175733-2a1774d88802.1/go.mod h1:tvtbpgaVXZX4g6Pn+AnzFycuRK3MOz5HJfEGeEllXYM= cloud.google.com/go/auth v0.18.0 h1:wnqy5hrv7p3k7cShwAU/Br3nzod7fxoqG+k0VZ+/Pk0= cloud.google.com/go/auth v0.18.0/go.mod h1:wwkPM1AgE1f2u6dG443MiWoD8C3BtOywNsUMcUTVDRo= cloud.google.com/go/auth/oauth2adapt v0.2.8 h1:keo8NaayQZ6wimpNSmW5OPc283g65QNIiLpZnkHRbnc= cloud.google.com/go/auth/oauth2adapt v0.2.8/go.mod h1:XQ9y31RkqZCcwJWNSx2Xvric3RrU88hAYYbjDWYDL+c= cloud.google.com/go/compute/metadata v0.9.0 h1:pDUj4QMoPejqq20dK0Pg2N4yG9zIkYGdBtwLoEkH9Zs= cloud.google.com/go/compute/metadata v0.9.0/go.mod h1:E0bWwX5wTnLPedCKqk3pJmVgCBSM6qQI1yTBdEb3C10= +connectrpc.com/connect v1.19.1 h1:R5M57z05+90EfEvCY1b7hBxDVOUl45PrtXtAV2fOC14= +connectrpc.com/connect v1.19.1/go.mod h1:tN20fjdGlewnSFeZxLKb0xwIZ6ozc3OQs2hTXy4du9w= drjosh.dev/zzglob v0.4.2 h1:q+e5Cp6SFCyz+Yurhk/edSrTKEk3tn60vzoaXLmtiBo= drjosh.dev/zzglob v0.4.2/go.mod h1:SbYDdesQC13iyGiEwV8dJfJbyz7/Qiawrd5ODdJQCoo= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.20.0 h1:JXg2dwJUmPB9JmtVmdEB16APJ7jurfbY5jnfXpJoRMc= diff --git a/internal/e2e/basic_test.go b/internal/e2e/basic_test.go index 9862fb16b4..d8cc9434ab 100644 --- a/internal/e2e/basic_test.go +++ b/internal/e2e/basic_test.go @@ -3,8 +3,10 @@ package e2e import ( + "context" "strings" "testing" + "time" ) func TestBasicE2E(t *testing.T) { @@ -13,7 +15,59 @@ func TestBasicE2E(t *testing.T) { tc.startAgent() build := tc.triggerBuild() - state := tc.waitForBuild(ctx, build) + + // It should take much less time than 1 minute to successfully run the job. + waitCtx, canc := context.WithTimeout(ctx, 1*time.Minute) + defer canc() + + state := tc.waitForBuild(waitCtx, build) + if got, want := state, "passed"; got != want { + t.Errorf("Build state = %q, want %q", got, want) + } + + logs := tc.fetchLogs(ctx, build) + if !strings.Contains(logs, "hello world") { + t.Errorf("tc.fetchLogs(ctx, build %q) logs as follows, did not contain 'hello world'\n%s", build.ID, logs) + } +} + +func TestBasicE2E_PingOnly(t *testing.T) { + ctx := t.Context() + tc := newTestCase(t, "basic_e2e.yaml") + + tc.startAgent("--ping-mode=ping-only") + build := tc.triggerBuild() + + // It should take much less time than 1 minute to successfully run the job. + waitCtx, canc := context.WithTimeout(ctx, 1*time.Minute) + defer canc() + + state := tc.waitForBuild(waitCtx, build) + if got, want := state, "passed"; got != want { + t.Errorf("Build state = %q, want %q", got, want) + } + + logs := tc.fetchLogs(ctx, build) + if !strings.Contains(logs, "hello world") { + t.Errorf("tc.fetchLogs(ctx, build %q) logs as follows, did not contain 'hello world'\n%s", build.ID, logs) + } +} + +func TestBasicE2E_StreamOnly(t *testing.T) { + ctx := t.Context() + tc := newTestCase(t, "basic_e2e.yaml") + + tc.startAgent( + "--ping-mode=stream-only", + "--endpoint=https://agent-edge.buildkite.com/v3", + ) + build := tc.triggerBuild() + + // It should take much less time than 1 minute to successfully run the job. + waitCtx, canc := context.WithTimeout(ctx, 2*time.Minute) + defer canc() + + state := tc.waitForBuild(waitCtx, build) if got, want := state, "passed"; got != want { t.Errorf("Build state = %q, want %q", got, want) }