Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func startTraceHandling(ctx context.Context, trc *tracer.Tracer) error {
// Spawn monitors for the various result maps
traceCh := make(chan *host.Trace)

if err := trc.StartMapMonitors(ctx, traceCh); err != nil {
if err := trc.StartMapMonitors(traceCh); err != nil {
return fmt.Errorf("failed to start map monitors: %v", err)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func Test_Golabels(t *testing.T) {

traceCh := make(chan *host.Trace)

err = trc.StartMapMonitors(ctx, traceCh)
err = trc.StartMapMonitors(traceCh)
require.NoError(t, err)

go func() {
Expand Down
8 changes: 7 additions & 1 deletion processmanager/ebpf/asyncupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package ebpf // import "go.opentelemetry.io/ebpf-profiler/processmanager/ebpf"
import (
"context"
"errors"
"sync"
"unsafe"

cebpf "github.com/cilium/ebpf"
Expand Down Expand Up @@ -43,6 +44,7 @@ import (
// versa.
type asyncMapUpdaterPool struct {
workers []*asyncUpdateWorker
wg sync.WaitGroup
}

// newAsyncMapUpdaterPool creates a new worker pool
Expand All @@ -52,7 +54,11 @@ func newAsyncMapUpdaterPool(ctx context.Context,
for range numWorkers {
queue := make(chan asyncMapInMapUpdate, workerQueueCapacity)
worker := &asyncUpdateWorker{ctx: ctx, queue: queue}
go worker.serve()
pool.wg.Add(1)
go func() {
defer pool.wg.Done()
worker.serve()
}()
pool.workers = append(pool.workers, worker)
}
return pool
Expand Down
8 changes: 8 additions & 0 deletions processmanager/ebpf/ebpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ func LoadMaps(ctx context.Context, maps map[string]*cebpf.Map) (ebpfapi.EbpfHand
return impl, nil
}

// WaitAsyncUpdates waits for all background async map update workers to exit.
// This should be called during shutdown before closing eBPF maps.
func (impl *ebpfMapsImpl) WaitAsyncUpdates() {
if impl.updateWorkers != nil {
impl.updateWorkers.wg.Wait()
}
}

// UpdateInterpreterOffsets adds the given moduleRanges to the eBPF map interpreterOffsets.
func (impl *ebpfMapsImpl) UpdateInterpreterOffsets(ebpfProgIndex uint16, fileID host.FileID,
offsetRanges []util.Range) error {
Expand Down
6 changes: 6 additions & 0 deletions processmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,12 @@ func collectInterpreterMetrics(ctx context.Context, pm *ProcessManager,
}

func (pm *ProcessManager) Close() {
// If the underlying ebpf handler provides a way to wait for background
// async update workers to exit, wait here to avoid EBADF on closing maps.
type ebpfWaiter interface{ WaitAsyncUpdates() }
if w, ok := any(pm.ebpf).(ebpfWaiter); ok {
w.WaitAsyncUpdates()
}
}

func (pm *ProcessManager) symbolizeFrame(pid libpf.PID, bpfFrame *host.Frame, frames *libpf.Frames) error {
Expand Down
2 changes: 1 addition & 1 deletion tracer/ebpf_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestTraceTransmissionAndParsing(t *testing.T) {
require.NoError(t, err)

traceChan := make(chan *host.Trace, 16)
err = tr.StartMapMonitors(ctx, traceChan)
err = tr.StartMapMonitors(traceChan)
require.NoError(t, err)

runKernelFrameProbe(t, tr)
Expand Down
37 changes: 30 additions & 7 deletions tracer/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ const (

// StartPIDEventProcessor spawns a goroutine to process PID events.
func (t *Tracer) StartPIDEventProcessor(ctx context.Context) {
go t.processPIDEvents(ctx)
t.wg.Add(1)
go func() {
defer t.wg.Done()
t.processPIDEvents(t.ctx)
}()
}

// Process the PID events that are incoming in the Tracer channel.
Expand Down Expand Up @@ -88,18 +92,22 @@ func (t *Tracer) triggerPidEvent(data []byte) {
// calls. Returns a function that can be called to retrieve perf event array
// error counts.
func startPerfEventMonitor(ctx context.Context, perfEventMap *ebpf.Map,
triggerFunc func([]byte), perCPUBufferSize int) func() (lost, noData, readError uint64) {
triggerFunc func([]byte), perCPUBufferSize int, registerCloser func(func())) func() (lost, noData, readError uint64) {
eventReader, err := perf.NewReader(perfEventMap, perCPUBufferSize)
if err != nil {
log.Fatalf("Failed to setup perf reporting via %s: %v", perfEventMap, err)
}
if registerCloser != nil {
registerCloser(func() { _ = eventReader.Close() })
}

var lostEventsCount, readErrorCount, noDataCount atomic.Uint64
go func() {
var data perf.Record
for {
select {
case <-ctx.Done():
_ = eventReader.Close()
return
default:
if err := eventReader.ReadInto(&data); err != nil {
Expand Down Expand Up @@ -148,7 +156,12 @@ func (t *Tracer) startTraceEventMonitor(ctx context.Context,
eventReader.SetDeadline(time.Unix(1, 0))

var lostEventsCount, readErrorCount, noDataCount atomic.Uint64
// Ensure the reader can be closed from Tracer.Close to unblock goroutines.
t.registerCloser(func() { _ = eventReader.Close() })

t.wg.Add(1)
go func() {
defer t.wg.Done()
var data perf.Record
var oldKTime, minKTime times.KTime
var eventCount int
Expand All @@ -162,6 +175,7 @@ func (t *Tracer) startTraceEventMonitor(ctx context.Context,
select {
// Always check for context cancellation in each iteration
case <-ctx.Done():
_ = eventReader.Close()
break PollLoop
default:
// Continue below
Expand All @@ -170,6 +184,7 @@ func (t *Tracer) startTraceEventMonitor(ctx context.Context,
select {
// This context cancellation check may not execute in timely manner
case <-ctx.Done():
_ = eventReader.Close()
break PollLoop
case <-pollTicker.C:
// Continue execution below
Expand Down Expand Up @@ -216,9 +231,12 @@ func (t *Tracer) startTraceEventMonitor(ctx context.Context,
if minKTime == 0 || trace.KTime < minKTime {
minKTime = trace.KTime
}
// TODO: This per-event channel send couples event processing in the rest of
// the agent with event reading from the perf buffers slowing down the latter.
traceOutChan <- trace
select {
case traceOutChan <- trace:
case <-ctx.Done():
_ = eventReader.Close()
break PollLoop
}
if eventCount == maxEvents {
// Break this inner loop to ensure ProcessedUntil logic executes
break
Expand All @@ -244,7 +262,12 @@ func (t *Tracer) startTraceEventMonitor(ctx context.Context,
// ProcessedUntil(t1) first and t0 next, with t0 < t1.
if oldKTime > 0 {
// Ensure that all previously sent trace events have been processed
traceOutChan <- nil
select {
case traceOutChan <- nil:
case <-ctx.Done():
_ = eventReader.Close()
break PollLoop
}

if minKTime > 0 && minKTime <= oldKTime {
// If minKTime is smaller than oldKTime, use it and reset it
Expand Down Expand Up @@ -280,7 +303,7 @@ func (t *Tracer) startEventMonitor(ctx context.Context) func() []metrics.Metric
log.Fatalf("Map report_events is not available")
}

getPerfErrorCounts := startPerfEventMonitor(ctx, eventMap, t.triggerPidEvent, os.Getpagesize())
getPerfErrorCounts := startPerfEventMonitor(ctx, eventMap, t.triggerPidEvent, os.Getpagesize(), t.registerCloser)
return func() []metrics.Metric {
lost, noData, readError := getPerfErrorCounts()

Expand Down
113 changes: 88 additions & 25 deletions tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"math"
"math/rand/v2"
"strings"
"sync"
"time"
"unsafe"

Expand Down Expand Up @@ -115,6 +116,20 @@ type Tracer struct {

// probabilisticThreshold holds the threshold for probabilistic profiling.
probabilisticThreshold uint

// ctx and cancelFunc are used to manage the lifecycle of background goroutines.
ctx context.Context
cancelFunc context.CancelFunc

// closers contains deferred cleanup functions for resources (e.g., perf readers).
closersMu sync.Mutex
closers []func()

// closeOnce ensures Close is idempotent and thread-safe.
closeOnce sync.Once

// wg tracks background goroutines owned by Tracer for deterministic shutdown.
wg sync.WaitGroup
}

type Config struct {
Expand Down Expand Up @@ -194,6 +209,7 @@ func schedProcessFreeHookName(progNames libpf.Set[string]) string {

// NewTracer loads eBPF code and map definitions from the ELF module at the configured path.
func NewTracer(ctx context.Context, cfg *Config) (*Tracer, error) {
tracerCtx, cancelFunc := context.WithCancel(ctx)
kernelSymbolizer, err := kallsyms.NewSymbolizer()
if err != nil {
return nil, fmt.Errorf("failed to read kernel symbols: %v", err)
Expand All @@ -210,14 +226,14 @@ func NewTracer(ctx context.Context, cfg *Config) (*Tracer, error) {
return nil, fmt.Errorf("failed to load eBPF code: %v", err)
}

ebpfHandler, err := pmebpf.LoadMaps(ctx, ebpfMaps)
ebpfHandler, err := pmebpf.LoadMaps(tracerCtx, ebpfMaps)
if err != nil {
return nil, fmt.Errorf("failed to load eBPF maps: %v", err)
}

hasBatchOperations := ebpfHandler.SupportsGenericBatchOperations()

processManager, err := pm.New(ctx, cfg.IncludeTracers, cfg.Intervals.MonitorInterval(),
processManager, err := pm.New(tracerCtx, cfg.IncludeTracers, cfg.Intervals.MonitorInterval(),
ebpfHandler, nil, cfg.TraceReporter, cfg.ExecutableReporter,
elfunwindinfo.NewStackDeltaProvider(),
cfg.FilterErrorFrames, cfg.IncludeEnvVars)
Expand All @@ -241,6 +257,8 @@ func NewTracer(ctx context.Context, cfg *Config) (*Tracer, error) {
samplesPerSecond: cfg.SamplesPerSecond,
probabilisticInterval: cfg.ProbabilisticInterval,
probabilisticThreshold: cfg.ProbabilisticThreshold,
ctx: tracerCtx,
cancelFunc: cancelFunc,
}

return tracer, nil
Expand All @@ -249,27 +267,72 @@ func NewTracer(ctx context.Context, cfg *Config) (*Tracer, error) {
// Close provides functionality for Tracer to perform cleanup tasks.
// NOTE: Close may be called multiple times in succession.
func (t *Tracer) Close() {
events := t.perfEntrypoints.WLock()
for _, event := range *events {
if err := event.Disable(); err != nil {
log.Errorf("Failed to disable perf event: %v", err)
t.closeOnce.Do(func() {
// Cancel the context to stop all background goroutines
if t.cancelFunc != nil {
t.cancelFunc()
}
if err := event.Close(); err != nil {
log.Errorf("Failed to close perf event: %v", err)

t.closersMu.Lock()
for _, f := range t.closers {
func() {
defer func() { _ = recover() }()
f()
}()
}
}
*events = nil
t.perfEntrypoints.WUnlock(&events)
t.closers = nil
t.closersMu.Unlock()

// Deterministically wait for Tracer-owned goroutines to exit
t.wg.Wait()

// Avoid resource leakage by closing all kernel hooks.
for hookPoint, hook := range t.hooks {
if err := hook.Close(); err != nil {
log.Errorf("Failed to close '%s/%s': %v", hookPoint.group, hookPoint.name, err)
// Close perf events
events := t.perfEntrypoints.WLock()
for _, event := range *events {
if err := event.Disable(); err != nil {
log.Errorf("Failed to disable perf event: %v", err)
}
if err := event.Close(); err != nil {
log.Errorf("Failed to close perf event: %v", err)
}
}
delete(t.hooks, hookPoint)
}
*events = nil
t.perfEntrypoints.WUnlock(&events)

// Avoid resource leakage by closing all kernel hooks.
for hookPoint, hook := range t.hooks {
if err := hook.Close(); err != nil {
log.Errorf("Failed to close '%s/%s': %v", hookPoint.group, hookPoint.name, err)
}
delete(t.hooks, hookPoint)
}

// Finalize process manager first, waiting background async updates to stop
t.processManager.Close()

// Close all eBPF programs
for name, prog := range t.ebpfProgs {
if err := prog.Close(); err != nil {
log.Errorf("Failed to close eBPF program '%s': %v", name, err)
}
delete(t.ebpfProgs, name)
}

// Close all eBPF maps
for name, m := range t.ebpfMaps {
if err := m.Close(); err != nil {
log.Errorf("Failed to close eBPF map '%s': %v", name, err)
}
delete(t.ebpfMaps, name)
}
})
}

t.processManager.Close()
// registerCloser appends a cleanup function to be called during Close.
func (t *Tracer) registerCloser(f func()) {
t.closersMu.Lock()
defer t.closersMu.Unlock()
t.closers = append(t.closers, f)
}

func buildStackDeltaTemplates(coll *cebpf.CollectionSpec) error {
Expand Down Expand Up @@ -971,15 +1034,15 @@ func (t *Tracer) loadBpfTrace(raw []byte, cpu int) *host.Trace {

// StartMapMonitors starts goroutines for collecting metrics and monitoring eBPF
// maps for tracepoints, new traces, trace count updates and unknown PCs.
func (t *Tracer) StartMapMonitors(ctx context.Context, traceOutChan chan<- *host.Trace) error {
if err := t.kernelSymbolizer.StartMonitor(ctx); err != nil {
func (t *Tracer) StartMapMonitors(traceOutChan chan<- *host.Trace) error {
if err := t.kernelSymbolizer.StartMonitor(t.ctx); err != nil {
log.Warnf("Failed to start kallsyms monitor: %v", err)
}
eventMetricCollector := t.startEventMonitor(ctx)
traceEventMetricCollector := t.startTraceEventMonitor(ctx, traceOutChan)
eventMetricCollector := t.startEventMonitor(t.ctx)
traceEventMetricCollector := t.startTraceEventMonitor(t.ctx, traceOutChan)

pidEvents := make([]libpf.PIDTID, 0)
periodiccaller.StartWithManualTrigger(ctx, t.intervals.MonitorInterval(),
periodiccaller.StartWithManualTrigger(t.ctx, t.intervals.MonitorInterval(),
t.triggerPIDProcessing, func(_ bool) {
t.enableEvent(support.EventTypeGenericPID)
t.monitorPIDEventsMap(&pidEvents)
Expand All @@ -1001,7 +1064,7 @@ func (t *Tracer) StartMapMonitors(ctx context.Context, traceOutChan chan<- *host
// calculate and store delta values.
previousMetricValue := make([]metrics.MetricValue, len(translateIDs))

periodiccaller.Start(ctx, t.intervals.MonitorInterval(), func() {
periodiccaller.Start(t.ctx, t.intervals.MonitorInterval(), func() {
metrics.AddSlice(eventMetricCollector())
metrics.AddSlice(traceEventMetricCollector())
metrics.AddSlice(t.eBPFMetricsCollector(translateIDs, previousMetricValue))
Expand Down Expand Up @@ -1114,7 +1177,7 @@ func (t *Tracer) StartProbabilisticProfiling(ctx context.Context) {
// before getting called.
t.probabilisticProfile(t.probabilisticInterval, t.probabilisticThreshold)

periodiccaller.Start(ctx, t.probabilisticInterval, func() {
periodiccaller.Start(t.ctx, t.probabilisticInterval, func() {
t.probabilisticProfile(t.probabilisticInterval, t.probabilisticThreshold)
})
}
Expand Down