Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 11 additions & 14 deletions ddtrace/contrib/internal/ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,18 @@
Ray instrumentation is experimental. It is deactivated by default. To enable it,
you have to follow one of the two methods below:

The recommended way to instrument Ray, is to instrument the Ray cluster.
You can do it by starting the Ray head with a tracing startup hook::
The recommended way to instrument Ray, is to instrument the Ray cluster using ddtrace-run::

ray start --head --tracing-startup-hook=ddtrace.contrib.ray:setup_tracing
DD_PATCH_MODULES="ray:true, aiohttp:false, grpc:false, requests:false" ddtrace-run ray start --head

Otherwise, you can specify the tracing hook in `ray.init()` using::
DD_PATCH_MODULES will allow to reduce noise by sending only the jobs related spans.

ray.init(_tracing_startup_hook="ddtrace.contrib.ray:setup_tracing")
You can also do it by starting Ray head with a tracing startup hook::

Note that this method does not provide full tracing capabilities.
ray start --head --tracing-startup-hook=ddtrace.contrib.ray:setup_tracing

Note that this method does not provide full tracing capabilities if ``ray.init()`` is not called at the top
of your job scripts.

Configuration
~~~~~~~~~~~~~
Expand All @@ -44,21 +45,17 @@
(default: ``True``). If ``True``, file paths in the entrypoint will be redacted to avoid
leaking sensitive information.

Ray service name can be configured by:
Ray service name can be configured, in order of precedence by:

- specifying in submission ID using ``job:your-job-name`` during job submission::
- specifying ``DD_SERVICE`` when initializing your Ray cluster.

ray job submit --submission-id="job:my_model,run:39" -- python entrypoint.py
- setting ``DD_TRACE_RAY_USE_ENTRYPOINT_AS_SERVICE_NAME=True``. In this case, the service
name will be the name of your entrypoint script.

- specifying in metadata during job submission::

ray job submit --metadata-json='{"job_name": "my_model"}' -- python entrypoint.py

- specifying ``DD_SERVICE`` when initializing your Ray cluster.

- setting ``DD_TRACE_RAY_USE_ENTRYPOINT_AS_SERVICE_NAME=True``. In this case, the service
name will be the name of your entrypoint script.

By default, the service name will be ``unnamed.ray.job``.

Notes
Expand Down
52 changes: 34 additions & 18 deletions ddtrace/contrib/internal/ray/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,14 @@
RAY_SERVICE_NAME = os.environ.get(RAY_JOB_NAME)

# Ray modules that should be excluded from tracing
RAY_MODULE_DENYLIST = {
"ray.dag",
RAY_COMMON_MODULE_DENYLIST = {
"ray.data._internal",
}

RAY_TASK_MODULE_DENYLIST = {*RAY_COMMON_MODULE_DENYLIST}

RAY_ACTOR_MODULE_DENYLIST = {
*RAY_COMMON_MODULE_DENYLIST,
"ray.experimental",
"ray.data._internal",
}
Expand Down Expand Up @@ -143,6 +149,11 @@ def _wrap_task_execution(wrapped, *args, **kwargs):

def traced_submit_task(wrapped, instance, args, kwargs):
"""Trace task submission, i.e the func.remote() call"""

# Tracing doesn't work for cross lang yet.
if instance._function.__module__ in RAY_TASK_MODULE_DENYLIST or instance._is_cross_language:
return wrapped(*args, **kwargs)

if tracer.current_span() is None:
log.debug(
"No active span found in %s.remote(), activating trace context from environment", instance._function_name
Expand All @@ -153,11 +164,10 @@ def traced_submit_task(wrapped, instance, args, kwargs):
# This is done under a lock as multiple task could be submit at the same time
# and thus try to modify the signature as the same time
with instance._inject_lock:
if not getattr(instance._function, "_dd_trace_wrapped", False):
if instance._function_signature is None:
instance._function = _wrap_remote_function_execution(instance._function)
instance._function.__signature__ = _inject_dd_trace_ctx_kwarg(instance._function)
instance._function_signature = extract_signature(instance._function)
instance._function._dd_trace_wrapped = True

with tracer.trace(
"task.submit",
Expand Down Expand Up @@ -197,22 +207,24 @@ def traced_submit_job(wrapped, instance, args, kwargs):
"""
from ray.dashboard.modules.job.job_manager import generate_job_id

# Three ways of specifying the job name, in order of precedence:
# 1. Metadata JSON: ray job submit --metadata_json '{"job_name": "train.cool.model"}' train.py
# 2. Special submission ID format: ray job submit --submission_id "job:train.cool.model,run:38" train.py
# 3. Ray entrypoint: ray job submit train_cool_model.py
# Three ways of setting the service name of the spans, in order of precedence:
# - DD_SERVICE environment variable
# - The name of the entrypoint if DD_TRACE_RAY_USE_ENTRYPOINT_AS_SERVICE_NAME is True
# - Metadata JSON: ray job submit --metadata_json '{"job_name": "train.cool.model"}'
# Otherwise set to unnamed.ray.job
submission_id = kwargs.get("submission_id") or generate_job_id()
kwargs["submission_id"] = submission_id

entrypoint = kwargs.get("entrypoint", "")
if entrypoint and config.ray.redact_entrypoint_paths:
if config.ray.redact_entrypoint_paths:
entrypoint = redact_paths(entrypoint)
job_name = config.service or kwargs.get("metadata", {}).get("job_name", "")

if not job_name:
if config.ray.use_entrypoint_as_service_name:
job_name = get_dd_job_name_from_entrypoint(entrypoint) or DEFAULT_JOB_NAME
else:
job_name = DEFAULT_JOB_NAME
if config.ray.use_entrypoint_as_service_name:
job_name = get_dd_job_name_from_entrypoint(entrypoint) or DEFAULT_JOB_NAME
else:
user_provided_service = config.service if config._is_user_provided_service else None
metadata_job_name = kwargs.get("metadata", {}).get("job_name", None)
job_name = user_provided_service or metadata_job_name or DEFAULT_JOB_NAME

job_span = tracer.start_span("ray.job", service=job_name or DEFAULT_JOB_NAME, span_type=SpanTypes.RAY)
try:
Expand Down Expand Up @@ -380,12 +392,12 @@ def traced_wait(wrapped, instance, args, kwargs):


def _job_supervisor_run_wrapper(method: Callable[..., Any]) -> Any:
async def _traced_run_method(self: Any, *args: Any, _dd_ray_trace_ctx, **kwargs: Any) -> Any:
async def _traced_run_method(self: Any, *args: Any, _dd_ray_trace_ctx=None, **kwargs: Any) -> Any:
import ray.exceptions

from ddtrace.ext import SpanTypes

context = _TraceContext._extract(_dd_ray_trace_ctx)
context = _TraceContext._extract(_dd_ray_trace_ctx) if _dd_ray_trace_ctx else None
submission_id = os.environ.get(RAY_SUBMISSION_ID)

with long_running_ray_span(
Expand Down Expand Up @@ -497,7 +509,7 @@ def inject_tracing_into_actor_class(wrapped, instance, args, kwargs):
class_name = str(cls.__name__)

# Skip tracing for certain ray modules
if any(module_name.startswith(denied_module) for denied_module in RAY_MODULE_DENYLIST):
if any(module_name.startswith(denied_module) for denied_module in RAY_ACTOR_MODULE_DENYLIST):
return cls

# Actor beginning with _ are considered internal and will not be traced
Expand Down Expand Up @@ -557,6 +569,10 @@ def patch():

ray._datadog_patch = True

from ray.util.tracing import tracing_helper

tracing_helper._global_is_tracing_enabled = False

@ModuleWatchdog.after_module_imported("ray.actor")
def _(m):
_w(m.ActorHandle, "_actor_method_call", traced_actor_method_call)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
fixes:
- |
ray: This fix resolves an issue where Ray jobs that did not explicitly call ``ray.init()`` at the top of their scripts
were not properly instrumented, resulting in incomplete traces.
To ensure full tracing capabilities, use ``ddtrace-run`` when starting your Ray cluster:
``DD_PATCH_MODULES="ray:true,aiohttp:false,grpc:false,requests:false" ddtrace-run ray start --head``.
62 changes: 0 additions & 62 deletions tests/contrib/ray/jobs/actor_and_task.py

This file was deleted.

49 changes: 0 additions & 49 deletions tests/contrib/ray/jobs/actor_interactions.py

This file was deleted.

24 changes: 24 additions & 0 deletions tests/contrib/ray/jobs/actor_without_init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import ray


@ray.remote
class Counter:
def __init__(self):
self.value = 0

def increment(self):
self.value += 1
return self.value

def get_value(self):
return self.value


def main():
counter = Counter.remote()
result = ray.get(counter.increment.remote())
assert result == 1, f"Expected 1, got {result}"


if __name__ == "__main__":
main()
21 changes: 0 additions & 21 deletions tests/contrib/ray/jobs/error_in_task.py

This file was deleted.

27 changes: 0 additions & 27 deletions tests/contrib/ray/jobs/nested_tasks.py

This file was deleted.

16 changes: 16 additions & 0 deletions tests/contrib/ray/jobs/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import ray


@ray.remote
def add_one(x):
return x + 1


def main():
futures_add_one = add_one.remote(0)
result = ray.get(futures_add_one)
assert result == 1, f"Unexpected results: {result}"


if __name__ == "__main__":
main()
Loading
Loading