Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions ddtrace/profiling/collector/_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def __init__(
frame: FrameType = sys._getframe(3)
code: CodeType = frame.f_code
self.init_location: str = f"{os.path.basename(code.co_filename)}:{frame.f_lineno}"
self.acquired_time: int = 0
self.acquired_time: Optional[int] = None
self.name: Optional[str] = None

### DUNDER methods ###
Expand Down Expand Up @@ -106,6 +106,13 @@ def __aenter__(self, *args: Any, **kwargs: Any) -> Any:

def _acquire(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
if not self.capture_sampler.capture():
if config.enable_asserts:
# Ensure acquired_time is not set when acquire is not sampled
# (else a bogus release sample is produced)
assert self.acquired_time is None, (
f"Expected acquired_time to be None when acquire is not sampled, got {self.acquired_time!r}"
) # nosec

return inner_func(*args, **kwargs)

start: int = time.monotonic_ns()
Expand Down Expand Up @@ -136,21 +143,12 @@ def __aexit__(self, *args: Any, **kwargs: Any) -> Any:

def _release(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
start: Optional[int] = getattr(self, "acquired_time", None)
try:
# Though it should generally be avoided to call release() from
# multiple threads, it is possible to do so. In that scenario, the
# following statement code will raise an AttributeError. This should
# not be propagated to the caller and to the users. The inner_func
# will raise an RuntimeError as the threads are trying to release()
# and unlocked lock, and the expected behavior is to propagate that.
del self.acquired_time
except AttributeError:
pass
self.acquired_time = None

try:
return inner_func(*args, **kwargs)
finally:
if start is not None:
if start:
self._flush_sample(start, end=time.monotonic_ns(), is_acquire=False)

def _flush_sample(self, start: int, end: int, is_acquire: bool) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
fixes:
- |
profiling: This fix resolves a critical issue where the Lock Profiler generated
release samples for non-sampled lock acquires, resulting in inflated or negative (when integer overflows)
lock hold times (e.g., "3.24k days per minute", "-970 days per minute").
This affected virtually all customers using sampling rates < 100% (which should be the majority).
7 changes: 5 additions & 2 deletions tests/profiling/collector/pprof_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def __init__(self, *args, **kwargs):
super().__init__(event_type=LockEventType.RELEASE, *args, **kwargs)


def parse_newest_profile(filename_prefix: str) -> pprof_pb2.Profile:
def parse_newest_profile(filename_prefix: str, assert_samples: bool = True) -> pprof_pb2.Profile:
"""Parse the newest profile that has given filename prefix. The profiler
outputs profile file with following naming convention:
<filename_prefix>.<pid>.<counter>.pprof, and in tests, we'd want to parse
Expand All @@ -150,7 +150,10 @@ def parse_newest_profile(filename_prefix: str) -> pprof_pb2.Profile:
serialized_data = dctx.stream_reader(fp).read()
profile = pprof_pb2.Profile()
profile.ParseFromString(serialized_data)
assert len(profile.sample) > 0, "No samples found in profile"

if assert_samples:
assert len(profile.sample) > 0, "No samples found in profile"

return profile


Expand Down
24 changes: 22 additions & 2 deletions tests/profiling_v2/collector/test_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import sys
import threading
import time
from typing import Callable
from typing import List
from typing import Optional
Expand Down Expand Up @@ -1139,8 +1140,6 @@ def test_lock_slots_enforced(self) -> None:

def test_lock_profiling_overhead_reasonable(self) -> None:
"""Test that profiling overhead with 0% capture is bounded."""
import time

# Measure without profiling (collector stopped)
regular_lock: LockClassInst = self.lock_class()
start: float = time.perf_counter()
Expand Down Expand Up @@ -1168,6 +1167,27 @@ def test_lock_profiling_overhead_reasonable(self) -> None:
overhead_multiplier < 50
), f"Overhead too high: {overhead_multiplier}x (regular: {regular_time:.6f}s, profiled: {profiled_time_zero:.6f}s)" # noqa: E501

def test_release_not_sampled_when_acquire_not_sampled(self) -> None:
"""Test that lock release events are NOT sampled if their corresponding acquire was not sampled."""
# Use capture_pct=0 to ensure acquire is NEVER sampled
with self.collector_class(capture_pct=0):
lock: LockClassInst = self.lock_class()
# Do multiple acquire/release cycles
for _ in range(10):
lock.acquire()
time.sleep(0.001)
lock.release()

ddup.upload()

profile: pprof_pb2.Profile = pprof_utils.parse_newest_profile(self.output_filename, assert_samples=False)
release_samples: List[pprof_pb2.Sample] = pprof_utils.get_samples_with_value_type(profile, "lock-release")

# release samples should NOT be generated when acquire wasn't sampled
assert len(release_samples) == 0, (
f"Expected no release samples when acquire wasn't sampled, got {len(release_samples)}"
)


class TestThreadingLockCollector(BaseThreadingLockCollectorTest):
"""Test Lock profiling"""
Expand Down
Loading