diff --git a/cmd/root.go b/cmd/root.go index c2babb05..5ddb2a8b 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -84,8 +84,6 @@ func NewRootCmd(cfg *env.Env, tel *telemetry.Client, logger log.Logger) *cobra.C func Execute(ctx context.Context) error { cfg := env.Init() - tel := telemetry.New(cfg.AnalyticsEndpoint, cfg.DisableEvents) - defer tel.Close() logger, cleanup, err := newLogger() if err != nil { @@ -108,6 +106,10 @@ func Execute(ctx context.Context) error { logger.Error("failed to shut down tracing: %v", err) } }() + + tel := telemetry.New(cfg.AnalyticsEndpoint, cfg.DisableEvents) + defer tel.Close() + logger.Info("lstk %s starting", version.Version()) // Resolve auth token for telemetry: keyring first, then env var. diff --git a/internal/awscli/exec.go b/internal/awscli/exec.go index 1768ab3c..ae46ba48 100644 --- a/internal/awscli/exec.go +++ b/internal/awscli/exec.go @@ -9,13 +9,22 @@ import ( "os/exec" "strings" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "github.com/localstack/lstk/internal/awsconfig" "github.com/localstack/lstk/internal/output" ) func Exec(ctx context.Context, endpointURL string, useProfile bool, stdout, stderr io.Writer, args []string) error { + ctx, span := otel.Tracer("github.com/localstack/lstk/internal/awscli").Start(ctx, "aws cli") + defer span.End() + awsBin, err := exec.LookPath("aws") if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return fmt.Errorf("aws CLI not found in PATH — install it from https://aws.amazon.com/cli/") } @@ -30,6 +39,11 @@ func Exec(ctx context.Context, endpointURL string, useProfile bool, stdout, stde } cmdArgs = append(cmdArgs, args...) + span.SetAttributes( + attribute.StringSlice("aws.args", args), + attribute.Bool("aws.use_profile", useProfile), + ) + cmd := exec.CommandContext(ctx, awsBin, cmdArgs...) cmd.Stdin = os.Stdin cmd.Stdout = stdout @@ -41,8 +55,12 @@ func Exec(ctx context.Context, endpointURL string, useProfile bool, stdout, stde if err := cmd.Run(); err != nil { var exitErr *exec.ExitError if errors.As(err, &exitErr) { + span.SetAttributes(attribute.Int("aws.exit_code", exitErr.ExitCode())) + span.SetStatus(codes.Error, "aws cli exited non-zero") return output.NewSilentError(err) } + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return err } return nil diff --git a/internal/telemetry/client.go b/internal/telemetry/client.go index 7af8be8a..8b1ad5aa 100644 --- a/internal/telemetry/client.go +++ b/internal/telemetry/client.go @@ -12,6 +12,8 @@ import ( "time" "github.com/google/uuid" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "github.com/localstack/lstk/internal/version" ) @@ -28,9 +30,10 @@ type Client struct { httpClient *http.Client endpoint string - events chan eventBody - done chan struct{} - closeOnce sync.Once + events chan eventBody + done chan struct{} + closeOnce sync.Once + machineIDOnce sync.Once } // SetAuthToken stores the resolved auth token for inclusion in telemetry events. @@ -46,11 +49,16 @@ func New(endpoint string, disabled bool) *Client { c := &Client{ enabled: true, sessionID: uuid.NewString(), - machineID: LoadOrCreateMachineID(), // http.Client has no default timeout (zero means none). Without one, a // slow or unreachable endpoint would block the worker goroutine. httpClient: &http.Client{ Timeout: 3 * time.Second, + Transport: otelhttp.NewTransport( + http.DefaultTransport, + otelhttp.WithSpanNameFormatter(func(_ string, r *http.Request) string { + return "telemetry " + r.Method + " " + r.URL.Path + }), + ), }, endpoint: endpoint, events: make(chan eventBody, 64), diff --git a/internal/telemetry/events.go b/internal/telemetry/events.go index 81bd4309..6233764b 100644 --- a/internal/telemetry/events.go +++ b/internal/telemetry/events.go @@ -91,24 +91,24 @@ func ToMap(v any) map[string]any { // GetEnvironment returns the common environment payload for telemetry events, // using the auth token set via SetAuthToken. -func (c *Client) GetEnvironment() Environment { - env := Environment{ +func (c *Client) GetEnvironment(ctx context.Context) Environment { + c.machineIDOnce.Do(func() { + c.machineID = LoadOrCreateMachineID(ctx) + }) + return Environment{ LstkVersion: version.Version(), AuthTokenID: c.authToken, OS: runtime.GOOS, Arch: runtime.GOARCH, + MachineID: c.machineID, } - if c.machineID != "" { - env.MachineID = c.machineID - } - return env } // EmitCommand emits an lstk_command telemetry event. The Environment block is // populated automatically from the client state. func (c *Client) EmitCommand(ctx context.Context, command string, flags []string, durationMS int64, exitCode int, errorMsg string) { c.Emit(ctx, "lstk_command", ToMap(CommandEvent{ - Environment: c.GetEnvironment(), + Environment: c.GetEnvironment(ctx), Parameters: CommandParameters{Command: command, Flags: flags}, Result: CommandResult{ DurationMS: durationMS, @@ -122,6 +122,6 @@ func (c *Client) EmitCommand(ctx context.Context, command string, flags []string // Environment field is populated automatically from the client state; any // value set by the caller is overwritten. func (c *Client) EmitEmulatorLifecycleEvent(ctx context.Context, event LifecycleEvent) { - event.Environment = c.GetEnvironment() + event.Environment = c.GetEnvironment(ctx) c.Emit(ctx, "lstk_lifecycle", ToMap(event)) } diff --git a/internal/telemetry/events_test.go b/internal/telemetry/events_test.go index ed7e7720..4749027c 100644 --- a/internal/telemetry/events_test.go +++ b/internal/telemetry/events_test.go @@ -48,7 +48,7 @@ func drainEvent(t *testing.T, tel *Client, ch <-chan map[string]any) map[string] func TestGetEnvironment_PopulatesAllFields(t *testing.T) { c := New("http://localhost", false) c.SetAuthToken("ls-abc123") - env := c.GetEnvironment() + env := c.GetEnvironment(context.Background()) assert.Equal(t, version.Version(), env.LstkVersion) assert.Equal(t, "ls-abc123", env.AuthTokenID) @@ -59,7 +59,7 @@ func TestGetEnvironment_PopulatesAllFields(t *testing.T) { func TestGetEnvironment_OmitsAuthTokenWhenEmpty(t *testing.T) { c := New("http://localhost", false) - env := c.GetEnvironment() + env := c.GetEnvironment(context.Background()) assert.Empty(t, env.AuthTokenID) } diff --git a/internal/telemetry/machine_id.go b/internal/telemetry/machine_id.go index e6046afb..db3a6bbc 100644 --- a/internal/telemetry/machine_id.go +++ b/internal/telemetry/machine_id.go @@ -4,12 +4,15 @@ import ( "context" "crypto/md5" "encoding/hex" + "net/http" "os" "path/filepath" "strings" dockerclient "github.com/docker/docker/client" "github.com/google/uuid" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "github.com/localstack/lstk/internal/config" ) @@ -22,8 +25,8 @@ const ( // trying in order: Docker daemon ID, /etc/machine-id, then a persisted random UUID. // Prefixes (dkr_, sys_, gen_) indicate origin, matching the Python implementation // in localstack-core so IDs can be correlated across tools. -func LoadOrCreateMachineID() string { - if id := dockerDaemonID(); id != "" { +func LoadOrCreateMachineID(ctx context.Context) string { + if id := dockerDaemonID(ctx); id != "" { return "dkr_" + anonymize(id) } if id := systemMachineID(); id != "" { @@ -37,13 +40,21 @@ func anonymize(physicalID string) string { return hex.EncodeToString(h[:])[:12] } -func dockerDaemonID() string { - c, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation()) +func dockerDaemonID(ctx context.Context) string { + c, err := dockerclient.NewClientWithOpts( + dockerclient.FromEnv, + dockerclient.WithAPIVersionNegotiation(), + dockerclient.WithTraceOptions( + otelhttp.WithSpanNameFormatter(func(_ string, r *http.Request) string { + return "docker " + r.Method + " " + r.URL.Path + }), + ), + ) if err != nil { return "" } defer func() { _ = c.Close() }() - info, err := c.Info(context.Background()) + info, err := c.Info(ctx) if err != nil { return "" } diff --git a/test/integration/env/env.go b/test/integration/env/env.go index f47876be..c9b93149 100644 --- a/test/integration/env/env.go +++ b/test/integration/env/env.go @@ -18,6 +18,8 @@ const ( DisableEvents Key = "LOCALSTACK_DISABLE_EVENTS" Home Key = "HOME" Persistence Key = "LOCALSTACK_PERSISTENCE" + Otel Key = "LSTK_OTEL" + OtelEndpoint Key = "OTEL_EXPORTER_OTLP_ENDPOINT" ) func Get(key Key) string { diff --git a/test/integration/telemetry_test.go b/test/integration/telemetry_test.go index 6331400a..c22a4c45 100644 --- a/test/integration/telemetry_test.go +++ b/test/integration/telemetry_test.go @@ -1,6 +1,8 @@ package integration_test import ( + "bytes" + "compress/gzip" "context" "encoding/json" "io" @@ -9,6 +11,7 @@ import ( "os" "os/exec" "runtime" + "sync" "testing" "time" @@ -232,6 +235,78 @@ func assertCommandTelemetry(t *testing.T, events <-chan map[string]any, command assert.InDelta(t, exitCode, result["exit_code"], 0) } +// TestOtelTelemetrySpanIsExported verifies that the span created for the +// analytics POST request reaches the OTLP collector. +func TestOtelTelemetrySpanIsExported(t *testing.T) { + requireDocker(t) + cleanup() + t.Cleanup(cleanup) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + startTestContainer(t, ctx) + + analyticsSrv, _ := mockAnalyticsServer(t) + otlpSrv, otlpBodies := mockOTLPCollector(t) + + cmd := exec.CommandContext(ctx, binaryPath(), "start") + cmd.Env = env.Environ(testEnvWithHome(t.TempDir(), "")). + With(env.AuthToken, "fake-token"). + With(env.AnalyticsEndpoint, analyticsSrv.URL). + With(env.Otel, "1"). + With(env.OtelEndpoint, otlpSrv.URL) + out, err := cmd.CombinedOutput() + require.NoError(t, err, "lstk start failed: %s", out) + + deadline := time.After(5 * time.Second) + for { + select { + case body := <-otlpBodies: + // otlptracehttp serializes spans as protobuf; UTF-8 strings appear + // inline in the wire format, so a substring search is sufficient. + if bytes.Contains(body, []byte("telemetry POST")) { + return + } + case <-deadline: + t.Fatal("timed out waiting for telemetry span in OTLP export — likely tel.Close() ran after tracing shutdown") + } + } +} + +// mockOTLPCollector returns a test server that accepts OTLP/HTTP trace exports +// and forwards each (decompressed) request body to the returned channel. +func mockOTLPCollector(t *testing.T) (*httptest.Server, <-chan []byte) { + t.Helper() + bodies := make(chan []byte, 16) + var once sync.Once + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var reader io.Reader = r.Body + if r.Header.Get("Content-Encoding") == "gzip" { + gz, err := gzip.NewReader(r.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + defer func() { _ = gz.Close() }() + reader = gz + } + body, err := io.ReadAll(reader) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + select { + case bodies <- body: + default: + once.Do(func() { t.Logf("OTLP body channel full, dropping payload of %d bytes", len(body)) }) + } + w.WriteHeader(http.StatusOK) + })) + t.Cleanup(srv.Close) + return srv, bodies +} + // collects events until count distinct event names have been received or the deadline expires. func collectTelemetryByName(t *testing.T, events <-chan map[string]any, count int) map[string]map[string]any { t.Helper()