Streaming
Trace streaming LLM responses.
Risicare provides utilities for tracing streaming LLM responses with proper chunk tracking and span attribution.
Why Streaming Needs Special Handling
Standard contextvars do not propagate across yield boundaries in async generators (a Python limitation, PEP 568). The streaming utilities use the span registry for ID-based lookup instead, ensuring span context is available throughout the stream.
traced_stream (Async)
Wrap async streaming responses with span-aware chunk tracking:
from risicare import traced_stream, register_span, unregister_span, get_tracer
async def generate():
tracer = get_tracer()
with tracer.start_span("llm_stream") as span:
register_span(span)
try:
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Write a story"}],
stream=True,
)
async for chunk in traced_stream(span.span_id, response, event_name="chunk"):
yield chunk.choices[0].delta.content or ""
finally:
unregister_span(span.span_id)
async for text in generate():
print(text, end="")Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
span_id | str | required | The span ID to track events under (from span.span_id) |
stream | AsyncIterator | required | The async iterator to wrap |
event_name | str | "chunk" | Name for chunk events recorded on the span |
traced_stream_sync (Sync)
Wrap synchronous streaming responses:
from risicare import traced_stream_sync, register_span, unregister_span, get_tracer
tracer = get_tracer()
with tracer.start_span("llm_stream") as span:
register_span(span)
try:
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Write a story"}],
stream=True,
)
for chunk in traced_stream_sync(span.span_id, response, event_name="chunk"):
print(chunk.choices[0].delta.content or "", end="")
finally:
unregister_span(span.span_id)Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
span_id | str | required | The span ID to track events under |
stream | Iterator | required | The iterator to wrap |
event_name | str | "chunk" | Name for chunk events recorded on the span |
How It Works
The streaming utilities:
- Look up the span by ID from the span registry
- Track each chunk with an event on the span (chunk number and size)
- Record totals when the stream completes (
stream.total_chunks,stream.total_size)
# Span attributes after streaming completes:
{
"stream.total_chunks": 42,
"stream.total_size": 1560,
}Span Registry
The span registry allows ID-based span retrieval in contexts where contextvars do not propagate (async generators, process pools):
from risicare import register_span, get_span_by_id, unregister_span
# Register a span for later retrieval
register_span(span, ttl_seconds=60)
# Retrieve the span by ID (from any context)
same_span = get_span_by_id(span.span_id)
# Clean up when done
unregister_span(span.span_id)TTL-Based Cleanup
Registered spans have a default TTL of 60 seconds and are automatically cleaned up when expired. For long-running streams, use extend_span_ttl(span_id, additional_seconds) to keep the span alive.
Custom Streaming
For custom streaming implementations, use the tracer span context manager:
from risicare import get_tracer
async def my_streaming_function():
tracer = get_tracer()
with tracer.start_span("streaming-operation") as span:
total_tokens = 0
async for chunk in some_stream():
total_tokens += count_tokens(chunk)
span.set_attribute("tokens_so_far", total_tokens)
yield chunk
span.set_attribute("total_tokens", total_tokens)Generator Functions
Decorators and Generators
Standard decorators do not work well with generator functions. Use traced_stream with the span registry instead:
from risicare import traced_stream, register_span, unregister_span, get_tracer
# Do this
async def stream_response():
tracer = get_tracer()
with tracer.start_span("llm_stream") as span:
register_span(span)
try:
async for chunk in traced_stream(span.span_id, client.stream()):
yield chunk
finally:
unregister_span(span.span_id)Streaming Span Attributes
| Attribute | Description |
|---|---|
stream.total_chunks | Total number of chunks received |
stream.total_size | Total accumulated size across all chunks |
Configuration
Disable Content Capture
risicare.init(
trace_content=False # Don't capture streaming content
)