diff --git a/CHANGELOG.md b/CHANGELOG.md index 36d084e..1477978 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,33 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.1.0] - 2026-04-20 + +### Added +- **`Last9LogToSpanProcessor`**: new OTel `LogRecordProcessor` that promotes + GenAI log events emitted by `opentelemetry-instrumentation-openai-v2` (new + GenAI semconv) onto the currently active span as both flat span attributes + and indexed attributes so the Last9 LLM dashboard renders prompts, + completions, and tool calls. + - Flat attrs: `gen_ai.prompt`, `gen_ai.completion` (JSON arrays) + - Span events: `gen_ai.content.prompt`, `gen_ai.content.completion` + - Indexed attrs: `gen_ai.prompt.{i}.*`, `gen_ai.completion.{i}.*` + (AgentOps / Traceloop compatible) +- `Last9SpanProcessor` now accepts an optional `log_processor=` kwarg; per-span + counter state in the bridge is released when its span ends. + +### Fixed +- LLM dashboard now shows user/assistant/tool messages for apps using the new + GenAI semconv (openai-v2) — previously these payloads were only emitted as + log records and never reached the dashboard. + +### Notes +- Python 3.14 users must pin `wrapt<2` because + `opentelemetry-instrumentation-openai-v2` 2.3b0 calls + `wrap_function_wrapper(module=..., name=..., wrapper=...)` and wrapt 2.0 + renamed the first kwarg to `target=`. Without the pin, instrumentation fails + silently and no log events are emitted. + ## [1.0.0] - 2026-02-14 ### Added diff --git a/README.md b/README.md index 2d67a75..d36b3e1 100644 --- a/README.md +++ b/README.md @@ -301,6 +301,54 @@ print(f"Cost: ${cost.total:.6f}") **Result**: You get standard OTel attributes (automatic) + Last9 cost/workflow (manual). +### Capturing Prompts, Completions, and Tool Calls + +`opentelemetry-instrumentation-openai-v2` (v2.x) follows the new OpenTelemetry +GenAI semantic conventions and emits message content, tool calls, and +completions as **OTel log events**, not as span attributes. The Last9 LLM +dashboard reads span attributes / events, so without a bridge those payloads +never reach the dashboard. + +`Last9LogToSpanProcessor` listens to those log events and promotes their +payloads onto the currently active span: + +- `gen_ai.prompt` (JSON array of prompt messages) +- `gen_ai.completion` (JSON array of completion choices) +- span events `gen_ai.content.prompt` / `gen_ai.content.completion` +- indexed `gen_ai.prompt.{i}.*` / `gen_ai.completion.{i}.*` (AgentOps / + Traceloop compatible) + +```python +from opentelemetry import trace, _logs +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.instrumentation.openai_v2 import OpenAIInstrumentor + +from last9_genai import Last9SpanProcessor, Last9LogToSpanProcessor + +log_bridge = Last9LogToSpanProcessor() + +tracer_provider = TracerProvider() +tracer_provider.add_span_processor(Last9SpanProcessor(log_processor=log_bridge)) +trace.set_tracer_provider(tracer_provider) + +logger_provider = LoggerProvider() +logger_provider.add_log_record_processor(log_bridge) +_logs.set_logger_provider(logger_provider) + +import os +os.environ["OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"] = "true" +OpenAIInstrumentor().instrument(logger_provider=logger_provider) +``` + +After this, every LLM call instrumented by `openai-v2` has its full prompt +and completion content available on the span. + +> **Python 3.14 users**: pin `wrapt<2`. `opentelemetry-instrumentation-openai-v2` +> 2.3b0 calls `wrap_function_wrapper(module=..., name=..., wrapper=...)` and +> wrapt 2.0 renamed the first kwarg to `target=`. Without the pin, +> instrumentation fails silently and no log events are emitted. + ## Usage Examples ### Multi-Turn Conversations diff --git a/last9_genai/__init__.py b/last9_genai/__init__.py index 4dc1de1..39b2c8f 100644 --- a/last9_genai/__init__.py +++ b/last9_genai/__init__.py @@ -34,7 +34,7 @@ For more information, see: https://github.com/last9/python-ai-sdk """ -__version__ = "1.0.0" +__version__ = "1.1.0" __author__ = "Last9 Inc." __license__ = "MIT" @@ -66,6 +66,9 @@ # Import span processor (auto-enrichment) from last9_genai.processor import Last9SpanProcessor +# Import log-to-span bridge for GenAI log events +from last9_genai.log_processor import Last9LogToSpanProcessor + # Import decorators (auto-tracking) from last9_genai.decorators import observe @@ -97,6 +100,8 @@ "clear_context", # Span processor "Last9SpanProcessor", + # Log-to-span bridge + "Last9LogToSpanProcessor", # Decorators (NEW) "observe", ] diff --git a/last9_genai/log_processor.py b/last9_genai/log_processor.py new file mode 100644 index 0000000..d95bfbe --- /dev/null +++ b/last9_genai/log_processor.py @@ -0,0 +1,160 @@ +""" +Log-to-span bridge for OTel GenAI semantic conventions. + +OpenTelemetry's newer GenAI instrumentations (e.g. opentelemetry-instrumentation-openai-v2) +emit request messages, responses, and tool calls as OTel log events — not as span +attributes. Last9's LLM dashboard reads span attributes / events, so without this bridge +those payloads never reach the dashboard. + +This processor promotes well-known GenAI log events onto the currently active span using +the flat + span-event scheme the Last9 LLM dashboard parses: + - Span attribute `gen_ai.prompt` : JSON array of prompt messages + - Span attribute `gen_ai.completion` : JSON array of completion choices + - Span event `gen_ai.content.prompt` : { content: } + - Span event `gen_ai.content.completion`: { completion: } + +Indexed attributes (`gen_ai.prompt.{i}.*`) are also emitted for compatibility with +AgentOps / Traceloop-style consumers. +""" + +from __future__ import annotations + +import json +import threading +from typing import Any, Dict, List + +from opentelemetry import trace +from opentelemetry.sdk._logs import LogRecordProcessor, ReadWriteLogRecord + +GEN_AI_PROMPT_EVENTS = { + "gen_ai.system.message": "system", + "gen_ai.user.message": "user", + "gen_ai.assistant.message": "assistant", + "gen_ai.tool.message": "tool", +} +GEN_AI_CHOICE_EVENT = "gen_ai.choice" + + +class Last9LogToSpanProcessor(LogRecordProcessor): + """Promote GenAI log events to span attributes + events on the active span. + + Writes flat JSON-array attributes (what the Last9 LLM dashboard parses) and + indexed attributes (AgentOps/Traceloop convention) so downstream renderers + in either scheme can consume the payload. + """ + + def __init__(self, max_content_length: int = 4096): + self._max = max_content_length + self._state: Dict[int, Dict[str, List[dict]]] = {} + self._lock = threading.Lock() + + def on_emit(self, log_record: ReadWriteLogRecord) -> None: + record = log_record.log_record + event_name = getattr(record, "event_name", None) + if not event_name: + return + if event_name != GEN_AI_CHOICE_EVENT and event_name not in GEN_AI_PROMPT_EVENTS: + return + + span = trace.get_current_span() + ctx = span.get_span_context() + if not ctx.is_valid or not span.is_recording(): + return + + body = record.body + if not isinstance(body, dict): + return + + with self._lock: + state = self._state.setdefault(ctx.span_id, {"prompts": [], "completions": []}) + + if event_name == GEN_AI_CHOICE_EVENT: + idx = len(state["completions"]) + entry = self._build_completion_entry(body) + state["completions"].append(entry) + self._set_completion_indexed(span, idx, entry, body) + self._set_completion_flat(span, state["completions"]) + else: + idx = len(state["prompts"]) + default_role = GEN_AI_PROMPT_EVENTS[event_name] + entry = self._build_prompt_entry(default_role, body) + state["prompts"].append(entry) + self._set_prompt_indexed(span, idx, entry, body) + self._set_prompt_flat(span, state["prompts"]) + + def cleanup_span(self, span_id: int) -> None: + """Release per-span state when the span ends (called from Last9SpanProcessor).""" + with self._lock: + self._state.pop(span_id, None) + + def shutdown(self) -> None: + with self._lock: + self._state.clear() + + def force_flush(self, timeout_millis: int = 30000) -> bool: + return True + + def _truncate(self, value: Any) -> str: + s = value if isinstance(value, str) else json.dumps(value, default=str) + if len(s) > self._max: + return s[: self._max] + "...[truncated]" + return s + + def _build_prompt_entry(self, default_role: str, body: dict) -> dict: + entry: Dict[str, Any] = {"role": body.get("role", default_role)} + content = body.get("content") + if content is not None: + entry["content"] = content + if body.get("tool_calls"): + entry["tool_calls"] = body["tool_calls"] + if body.get("id"): + entry["tool_call_id"] = body["id"] + return entry + + def _build_completion_entry(self, body: dict) -> dict: + message = body.get("message") or {} + entry: Dict[str, Any] = {"role": message.get("role", "assistant")} + if message.get("content") is not None: + entry["content"] = message["content"] + if message.get("tool_calls"): + entry["tool_calls"] = message["tool_calls"] + if body.get("finish_reason") is not None: + entry["finish_reason"] = body["finish_reason"] + if body.get("index") is not None: + entry["index"] = body["index"] + return entry + + def _set_prompt_indexed(self, span, idx: int, entry: dict, body: dict) -> None: + span.set_attribute(f"gen_ai.prompt.{idx}.role", entry["role"]) + if "content" in entry: + span.set_attribute(f"gen_ai.prompt.{idx}.content", self._truncate(entry["content"])) + if "tool_calls" in entry: + span.set_attribute( + f"gen_ai.prompt.{idx}.tool_calls", self._truncate(entry["tool_calls"]) + ) + if "tool_call_id" in entry: + span.set_attribute(f"gen_ai.prompt.{idx}.tool_call.id", str(entry["tool_call_id"])) + + def _set_completion_indexed(self, span, idx: int, entry: dict, body: dict) -> None: + span.set_attribute(f"gen_ai.completion.{idx}.role", entry["role"]) + if "content" in entry: + span.set_attribute(f"gen_ai.completion.{idx}.content", self._truncate(entry["content"])) + if "tool_calls" in entry: + span.set_attribute( + f"gen_ai.completion.{idx}.tool_calls", + self._truncate(entry["tool_calls"]), + ) + if "finish_reason" in entry: + span.set_attribute(f"gen_ai.completion.{idx}.finish_reason", entry["finish_reason"]) + if "index" in entry: + span.set_attribute(f"gen_ai.completion.{idx}.index", entry["index"]) + + def _set_prompt_flat(self, span, prompts: List[dict]) -> None: + payload = self._truncate(prompts) + span.set_attribute("gen_ai.prompt", payload) + span.add_event("gen_ai.content.prompt", {"content": payload}) + + def _set_completion_flat(self, span, completions: List[dict]) -> None: + payload = self._truncate(completions) + span.set_attribute("gen_ai.completion", payload) + span.add_event("gen_ai.content.completion", {"completion": payload}) diff --git a/last9_genai/processor.py b/last9_genai/processor.py index 120c670..2ee0e69 100644 --- a/last9_genai/processor.py +++ b/last9_genai/processor.py @@ -52,6 +52,7 @@ def __init__( custom_pricing: Optional[Dict[str, ModelPricing]] = None, enable_cost_tracking: bool = True, workflow_tracker=None, + log_processor=None, ): """ Initialize the span processor. @@ -60,10 +61,13 @@ def __init__( custom_pricing: Dictionary of model pricing enable_cost_tracking: Whether to calculate and add cost attributes workflow_tracker: Optional workflow cost tracker instance + log_processor: Optional Last9LogToSpanProcessor whose per-span counter + state should be released when a span ends. """ self.custom_pricing = custom_pricing self.enable_cost_tracking = enable_cost_tracking self.workflow_tracker = workflow_tracker + self.log_processor = log_processor def on_start(self, span: "Span", parent_context: Optional[Context] = None) -> None: """ @@ -87,6 +91,11 @@ def on_end(self, span: ReadableSpan) -> None: Args: span: The span that just ended (read-only) """ + if self.log_processor is not None: + ctx = span.get_span_context() + if ctx.is_valid: + self.log_processor.cleanup_span(ctx.span_id) + if not span.attributes: return diff --git a/pyproject.toml b/pyproject.toml index ae58bf7..33acb41 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "last9-genai" -version = "1.0.0" +version = "1.1.0" description = "Last9 observability attributes for OpenTelemetry GenAI spans - track costs, workflows, and conversations in LLM applications" readme = "README.md" license = "MIT" diff --git a/setup.py b/setup.py index bf49b67..c61c4bd 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name="last9-genai", - version="1.0.0", + version="1.1.0", author="Last9 Inc.", author_email="hello@last9.io", description="Last9 observability attributes for OpenTelemetry GenAI spans", diff --git a/tests/test_log_processor.py b/tests/test_log_processor.py new file mode 100644 index 0000000..5d70d24 --- /dev/null +++ b/tests/test_log_processor.py @@ -0,0 +1,199 @@ +"""Unit tests for Last9LogToSpanProcessor.""" + +import json + +import pytest +from opentelemetry import trace +from opentelemetry._logs import LogRecord +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + +from last9_genai import Last9LogToSpanProcessor, Last9SpanProcessor + + +@pytest.fixture +def setup_providers(): + exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + bridge = Last9LogToSpanProcessor(max_content_length=200) + tracer_provider.add_span_processor(Last9SpanProcessor(log_processor=bridge)) + tracer_provider.add_span_processor(SimpleSpanProcessor(exporter)) + + logger_provider = LoggerProvider() + logger_provider.add_log_record_processor(bridge) + + tracer = tracer_provider.get_tracer("t") + logger = logger_provider.get_logger("openai_v2") + + yield tracer, logger, exporter, bridge + + tracer_provider.shutdown() + logger_provider.shutdown() + + +def test_prompt_event_sets_flat_and_indexed_attrs(setup_providers): + tracer, logger, exporter, _ = setup_providers + with tracer.start_as_current_span("chat gpt-4o") as span: + logger.emit( + LogRecord( + event_name="gen_ai.user.message", + body={"role": "user", "content": "hello"}, + context=trace.set_span_in_context(span), + ) + ) + + spans = exporter.get_finished_spans() + attrs = dict(spans[0].attributes) + assert attrs["gen_ai.prompt.0.role"] == "user" + assert attrs["gen_ai.prompt.0.content"] == "hello" + assert json.loads(attrs["gen_ai.prompt"]) == [{"role": "user", "content": "hello"}] + + events = {e.name: dict(e.attributes) for e in spans[0].events} + assert "gen_ai.content.prompt" in events + assert json.loads(events["gen_ai.content.prompt"]["content"]) == [ + {"role": "user", "content": "hello"} + ] + + +def test_choice_event_sets_completion_with_tool_calls(setup_providers): + tracer, logger, exporter, _ = setup_providers + tool_calls = [{"id": "call_1", "function": {"name": "get_weather"}}] + with tracer.start_as_current_span("chat gpt-4o") as span: + logger.emit( + LogRecord( + event_name="gen_ai.choice", + body={ + "index": 0, + "finish_reason": "tool_calls", + "message": {"role": "assistant", "tool_calls": tool_calls}, + }, + context=trace.set_span_in_context(span), + ) + ) + + attrs = dict(exporter.get_finished_spans()[0].attributes) + assert attrs["gen_ai.completion.0.role"] == "assistant" + assert attrs["gen_ai.completion.0.finish_reason"] == "tool_calls" + assert attrs["gen_ai.completion.0.index"] == 0 + assert json.loads(attrs["gen_ai.completion.0.tool_calls"]) == tool_calls + assert json.loads(attrs["gen_ai.completion"])[0]["tool_calls"] == tool_calls + + +def test_multiple_prompts_accumulate_in_order(setup_providers): + tracer, logger, exporter, _ = setup_providers + with tracer.start_as_current_span("chat") as span: + ctx = trace.set_span_in_context(span) + logger.emit( + LogRecord( + event_name="gen_ai.system.message", + body={"role": "system", "content": "sys"}, + context=ctx, + ) + ) + logger.emit( + LogRecord( + event_name="gen_ai.user.message", + body={"role": "user", "content": "hi"}, + context=ctx, + ) + ) + + attrs = dict(exporter.get_finished_spans()[0].attributes) + assert attrs["gen_ai.prompt.0.role"] == "system" + assert attrs["gen_ai.prompt.1.role"] == "user" + prompts = json.loads(attrs["gen_ai.prompt"]) + assert [p["role"] for p in prompts] == ["system", "user"] + + +def test_unrelated_event_names_ignored(setup_providers): + tracer, logger, exporter, _ = setup_providers + with tracer.start_as_current_span("chat") as span: + logger.emit( + LogRecord( + event_name="app.debug", + body={"content": "not a gen_ai event"}, + context=trace.set_span_in_context(span), + ) + ) + + attrs = dict(exporter.get_finished_spans()[0].attributes) + assert "gen_ai.prompt" not in attrs + assert not any(k.startswith("gen_ai.prompt.") for k in attrs) + + +def test_non_dict_body_ignored(setup_providers): + tracer, logger, exporter, _ = setup_providers + with tracer.start_as_current_span("chat") as span: + logger.emit( + LogRecord( + event_name="gen_ai.user.message", + body="plain string", + context=trace.set_span_in_context(span), + ) + ) + + attrs = dict(exporter.get_finished_spans()[0].attributes) + assert "gen_ai.prompt" not in attrs + + +def test_truncation_applied_on_long_content(setup_providers): + tracer, logger, exporter, _ = setup_providers + big = "x" * 500 + with tracer.start_as_current_span("chat") as span: + logger.emit( + LogRecord( + event_name="gen_ai.user.message", + body={"role": "user", "content": big}, + context=trace.set_span_in_context(span), + ) + ) + + content = dict(exporter.get_finished_spans()[0].attributes)["gen_ai.prompt.0.content"] + assert content.endswith("...[truncated]") + assert len(content) <= 200 + len("...[truncated]") + + +def test_cleanup_releases_state_after_span_ends(setup_providers): + tracer, logger, exporter, bridge = setup_providers + with tracer.start_as_current_span("chat") as span: + span_id = span.get_span_context().span_id + logger.emit( + LogRecord( + event_name="gen_ai.user.message", + body={"role": "user", "content": "hi"}, + context=trace.set_span_in_context(span), + ) + ) + assert span_id in bridge._state + + assert span_id not in bridge._state + + +def test_tool_message_captures_tool_call_id(setup_providers): + tracer, logger, exporter, _ = setup_providers + with tracer.start_as_current_span("chat") as span: + logger.emit( + LogRecord( + event_name="gen_ai.tool.message", + body={"role": "tool", "content": "sunny", "id": "call_abc"}, + context=trace.set_span_in_context(span), + ) + ) + + attrs = dict(exporter.get_finished_spans()[0].attributes) + assert attrs["gen_ai.prompt.0.role"] == "tool" + assert attrs["gen_ai.prompt.0.content"] == "sunny" + assert attrs["gen_ai.prompt.0.tool_call.id"] == "call_abc" + + +def test_no_active_span_drops_event(setup_providers): + _, logger, exporter, _ = setup_providers + logger.emit( + LogRecord( + event_name="gen_ai.user.message", + body={"role": "user", "content": "orphan"}, + ) + ) + assert exporter.get_finished_spans() == () diff --git a/uv.lock b/uv.lock index 6b04a05..6d5913e 100644 --- a/uv.lock +++ b/uv.lock @@ -794,7 +794,7 @@ wheels = [ [[package]] name = "last9-genai" -version = "1.0.0" +version = "1.1.0" source = { editable = "." } dependencies = [ { name = "opentelemetry-api" },