Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,33 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.1.0] - 2026-04-20

### Added
- **`Last9LogToSpanProcessor`**: new OTel `LogRecordProcessor` that promotes
GenAI log events emitted by `opentelemetry-instrumentation-openai-v2` (new
GenAI semconv) onto the currently active span as both flat span attributes
and indexed attributes so the Last9 LLM dashboard renders prompts,
completions, and tool calls.
- Flat attrs: `gen_ai.prompt`, `gen_ai.completion` (JSON arrays)
- Span events: `gen_ai.content.prompt`, `gen_ai.content.completion`
- Indexed attrs: `gen_ai.prompt.{i}.*`, `gen_ai.completion.{i}.*`
(AgentOps / Traceloop compatible)
- `Last9SpanProcessor` now accepts an optional `log_processor=` kwarg; per-span
counter state in the bridge is released when its span ends.

### Fixed
- LLM dashboard now shows user/assistant/tool messages for apps using the new
GenAI semconv (openai-v2) — previously these payloads were only emitted as
log records and never reached the dashboard.

### Notes
- Python 3.14 users must pin `wrapt<2` because
`opentelemetry-instrumentation-openai-v2` 2.3b0 calls
`wrap_function_wrapper(module=..., name=..., wrapper=...)` and wrapt 2.0
renamed the first kwarg to `target=`. Without the pin, instrumentation fails
silently and no log events are emitted.

## [1.0.0] - 2026-02-14

### Added
Expand Down
48 changes: 48 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,54 @@ print(f"Cost: ${cost.total:.6f}")

**Result**: You get standard OTel attributes (automatic) + Last9 cost/workflow (manual).

### Capturing Prompts, Completions, and Tool Calls

`opentelemetry-instrumentation-openai-v2` (v2.x) follows the new OpenTelemetry
GenAI semantic conventions and emits message content, tool calls, and
completions as **OTel log events**, not as span attributes. The Last9 LLM
dashboard reads span attributes / events, so without a bridge those payloads
never reach the dashboard.

`Last9LogToSpanProcessor` listens to those log events and promotes their
payloads onto the currently active span:

- `gen_ai.prompt` (JSON array of prompt messages)
- `gen_ai.completion` (JSON array of completion choices)
- span events `gen_ai.content.prompt` / `gen_ai.content.completion`
- indexed `gen_ai.prompt.{i}.*` / `gen_ai.completion.{i}.*` (AgentOps /
Traceloop compatible)

```python
from opentelemetry import trace, _logs
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.instrumentation.openai_v2 import OpenAIInstrumentor

from last9_genai import Last9SpanProcessor, Last9LogToSpanProcessor

log_bridge = Last9LogToSpanProcessor()

tracer_provider = TracerProvider()
tracer_provider.add_span_processor(Last9SpanProcessor(log_processor=log_bridge))
trace.set_tracer_provider(tracer_provider)

logger_provider = LoggerProvider()
logger_provider.add_log_record_processor(log_bridge)
_logs.set_logger_provider(logger_provider)

import os
os.environ["OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"] = "true"
OpenAIInstrumentor().instrument(logger_provider=logger_provider)
```

After this, every LLM call instrumented by `openai-v2` has its full prompt
and completion content available on the span.

> **Python 3.14 users**: pin `wrapt<2`. `opentelemetry-instrumentation-openai-v2`
> 2.3b0 calls `wrap_function_wrapper(module=..., name=..., wrapper=...)` and
> wrapt 2.0 renamed the first kwarg to `target=`. Without the pin,
> instrumentation fails silently and no log events are emitted.

## Usage Examples

### Multi-Turn Conversations
Expand Down
7 changes: 6 additions & 1 deletion last9_genai/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
For more information, see: https://github.com/last9/python-ai-sdk
"""

__version__ = "1.0.0"
__version__ = "1.1.0"
__author__ = "Last9 Inc."
__license__ = "MIT"

Expand Down Expand Up @@ -66,6 +66,9 @@
# Import span processor (auto-enrichment)
from last9_genai.processor import Last9SpanProcessor

# Import log-to-span bridge for GenAI log events
from last9_genai.log_processor import Last9LogToSpanProcessor

# Import decorators (auto-tracking)
from last9_genai.decorators import observe

Expand Down Expand Up @@ -97,6 +100,8 @@
"clear_context",
# Span processor
"Last9SpanProcessor",
# Log-to-span bridge
"Last9LogToSpanProcessor",
# Decorators (NEW)
"observe",
]
160 changes: 160 additions & 0 deletions last9_genai/log_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
"""
Log-to-span bridge for OTel GenAI semantic conventions.

OpenTelemetry's newer GenAI instrumentations (e.g. opentelemetry-instrumentation-openai-v2)
emit request messages, responses, and tool calls as OTel log events — not as span
attributes. Last9's LLM dashboard reads span attributes / events, so without this bridge
those payloads never reach the dashboard.

This processor promotes well-known GenAI log events onto the currently active span using
the flat + span-event scheme the Last9 LLM dashboard parses:
- Span attribute `gen_ai.prompt` : JSON array of prompt messages
- Span attribute `gen_ai.completion` : JSON array of completion choices
- Span event `gen_ai.content.prompt` : { content: <json array> }
- Span event `gen_ai.content.completion`: { completion: <json array> }

Indexed attributes (`gen_ai.prompt.{i}.*`) are also emitted for compatibility with
AgentOps / Traceloop-style consumers.
"""

from __future__ import annotations

import json
import threading
from typing import Any, Dict, List

from opentelemetry import trace
from opentelemetry.sdk._logs import LogRecordProcessor, ReadWriteLogRecord

GEN_AI_PROMPT_EVENTS = {
"gen_ai.system.message": "system",
"gen_ai.user.message": "user",
"gen_ai.assistant.message": "assistant",
"gen_ai.tool.message": "tool",
}
GEN_AI_CHOICE_EVENT = "gen_ai.choice"


class Last9LogToSpanProcessor(LogRecordProcessor):
"""Promote GenAI log events to span attributes + events on the active span.

Writes flat JSON-array attributes (what the Last9 LLM dashboard parses) and
indexed attributes (AgentOps/Traceloop convention) so downstream renderers
in either scheme can consume the payload.
"""

def __init__(self, max_content_length: int = 4096):
self._max = max_content_length
self._state: Dict[int, Dict[str, List[dict]]] = {}
self._lock = threading.Lock()

def on_emit(self, log_record: ReadWriteLogRecord) -> None:
record = log_record.log_record
event_name = getattr(record, "event_name", None)
if not event_name:
return
if event_name != GEN_AI_CHOICE_EVENT and event_name not in GEN_AI_PROMPT_EVENTS:
return

span = trace.get_current_span()
ctx = span.get_span_context()
if not ctx.is_valid or not span.is_recording():
return

body = record.body
if not isinstance(body, dict):
return

with self._lock:
state = self._state.setdefault(ctx.span_id, {"prompts": [], "completions": []})

if event_name == GEN_AI_CHOICE_EVENT:
idx = len(state["completions"])
entry = self._build_completion_entry(body)
state["completions"].append(entry)
self._set_completion_indexed(span, idx, entry, body)
self._set_completion_flat(span, state["completions"])
else:
idx = len(state["prompts"])
default_role = GEN_AI_PROMPT_EVENTS[event_name]
entry = self._build_prompt_entry(default_role, body)
state["prompts"].append(entry)
self._set_prompt_indexed(span, idx, entry, body)
self._set_prompt_flat(span, state["prompts"])

def cleanup_span(self, span_id: int) -> None:
"""Release per-span state when the span ends (called from Last9SpanProcessor)."""
with self._lock:
self._state.pop(span_id, None)

def shutdown(self) -> None:
with self._lock:
self._state.clear()

def force_flush(self, timeout_millis: int = 30000) -> bool:
return True

def _truncate(self, value: Any) -> str:
s = value if isinstance(value, str) else json.dumps(value, default=str)
if len(s) > self._max:
return s[: self._max] + "...[truncated]"
return s

def _build_prompt_entry(self, default_role: str, body: dict) -> dict:
entry: Dict[str, Any] = {"role": body.get("role", default_role)}
content = body.get("content")
if content is not None:
entry["content"] = content
if body.get("tool_calls"):
entry["tool_calls"] = body["tool_calls"]
if body.get("id"):
entry["tool_call_id"] = body["id"]
return entry

def _build_completion_entry(self, body: dict) -> dict:
message = body.get("message") or {}
entry: Dict[str, Any] = {"role": message.get("role", "assistant")}
if message.get("content") is not None:
entry["content"] = message["content"]
if message.get("tool_calls"):
entry["tool_calls"] = message["tool_calls"]
if body.get("finish_reason") is not None:
entry["finish_reason"] = body["finish_reason"]
if body.get("index") is not None:
entry["index"] = body["index"]
return entry

def _set_prompt_indexed(self, span, idx: int, entry: dict, body: dict) -> None:
span.set_attribute(f"gen_ai.prompt.{idx}.role", entry["role"])
if "content" in entry:
span.set_attribute(f"gen_ai.prompt.{idx}.content", self._truncate(entry["content"]))
if "tool_calls" in entry:
span.set_attribute(
f"gen_ai.prompt.{idx}.tool_calls", self._truncate(entry["tool_calls"])
)
if "tool_call_id" in entry:
span.set_attribute(f"gen_ai.prompt.{idx}.tool_call.id", str(entry["tool_call_id"]))

def _set_completion_indexed(self, span, idx: int, entry: dict, body: dict) -> None:
span.set_attribute(f"gen_ai.completion.{idx}.role", entry["role"])
if "content" in entry:
span.set_attribute(f"gen_ai.completion.{idx}.content", self._truncate(entry["content"]))
if "tool_calls" in entry:
span.set_attribute(
f"gen_ai.completion.{idx}.tool_calls",
self._truncate(entry["tool_calls"]),
)
if "finish_reason" in entry:
span.set_attribute(f"gen_ai.completion.{idx}.finish_reason", entry["finish_reason"])
if "index" in entry:
span.set_attribute(f"gen_ai.completion.{idx}.index", entry["index"])

def _set_prompt_flat(self, span, prompts: List[dict]) -> None:
payload = self._truncate(prompts)
span.set_attribute("gen_ai.prompt", payload)
span.add_event("gen_ai.content.prompt", {"content": payload})

def _set_completion_flat(self, span, completions: List[dict]) -> None:
payload = self._truncate(completions)
span.set_attribute("gen_ai.completion", payload)
span.add_event("gen_ai.content.completion", {"completion": payload})
9 changes: 9 additions & 0 deletions last9_genai/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def __init__(
custom_pricing: Optional[Dict[str, ModelPricing]] = None,
enable_cost_tracking: bool = True,
workflow_tracker=None,
log_processor=None,
):
"""
Initialize the span processor.
Expand All @@ -60,10 +61,13 @@ def __init__(
custom_pricing: Dictionary of model pricing
enable_cost_tracking: Whether to calculate and add cost attributes
workflow_tracker: Optional workflow cost tracker instance
log_processor: Optional Last9LogToSpanProcessor whose per-span counter
state should be released when a span ends.
"""
self.custom_pricing = custom_pricing
self.enable_cost_tracking = enable_cost_tracking
self.workflow_tracker = workflow_tracker
self.log_processor = log_processor

def on_start(self, span: "Span", parent_context: Optional[Context] = None) -> None:
"""
Expand All @@ -87,6 +91,11 @@ def on_end(self, span: ReadableSpan) -> None:
Args:
span: The span that just ended (read-only)
"""
if self.log_processor is not None:
ctx = span.get_span_context()
if ctx.is_valid:
self.log_processor.cleanup_span(ctx.span_id)

if not span.attributes:
return

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "last9-genai"
version = "1.0.0"
version = "1.1.0"
description = "Last9 observability attributes for OpenTelemetry GenAI spans - track costs, workflows, and conversations in LLM applications"
readme = "README.md"
license = "MIT"
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

setup(
name="last9-genai",
version="1.0.0",
version="1.1.0",
author="Last9 Inc.",
author_email="hello@last9.io",
description="Last9 observability attributes for OpenTelemetry GenAI spans",
Expand Down
Loading
Loading