Streaming
Trace streaming LLM responses.
Risicare provides utilities for tracing streaming LLM responses with proper chunk tracking and span attribution.
Why Streaming Needs Special Handling
Standard contextvars do not propagate across yield boundaries in async generators (a Python limitation, PEP 568). The streaming utilities use the span registry for ID-based lookup instead, ensuring span context is available throughout the stream.
traced_stream (Async)
Wrap async streaming responses with span-aware chunk tracking:
from risicare import traced_stream, register_span, unregister_span, get_tracer
async def generate():
tracer = get_tracer()
with tracer.start_span("llm_stream") as span:
register_span(span)
try:
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Write a story"}],
stream=True,
)
async for chunk in traced_stream(span.span_id, response, event_name="chunk"):
yield chunk.choices[0].delta.content or ""
finally:
unregister_span(span.span_id)
async for text in generate():
print(text, end="")Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
span_id | str | required | The span ID to track events under (from span.span_id) |
stream | AsyncIterator | required | The async iterator to wrap |
event_name | str | "chunk" | Name for chunk events recorded on the span |
traced_stream_sync (Sync)
Wrap synchronous streaming responses:
from risicare import traced_stream_sync, register_span, unregister_span, get_tracer
tracer = get_tracer()
with tracer.start_span("llm_stream") as span:
register_span(span)
try:
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Write a story"}],
stream=True,
)
for chunk in traced_stream_sync(span.span_id, response, event_name="chunk"):
print(chunk.choices[0].delta.content or "", end="")
finally:
unregister_span(span.span_id)Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
span_id | str | required | The span ID to track events under |
stream | Iterator | required | The iterator to wrap |
event_name | str | "chunk" | Name for chunk events recorded on the span |
How It Works
The streaming utilities:
- Look up the span by ID from the span registry
- Track each chunk with an event on the span (chunk number and size)
- Record totals when the stream completes (
stream.total_chunks,stream.total_size)
# Span attributes after streaming completes:
{
"stream.total_chunks": 42,
"stream.total_size": 1560,
}Span Registry
The span registry allows ID-based span retrieval in contexts where contextvars do not propagate (async generators, process pools):
from risicare import register_span, get_span_by_id, unregister_span
# Register a span for later retrieval
register_span(span, ttl_seconds=60)
# Retrieve the span by ID (from any context)
same_span = get_span_by_id(span.span_id)
# Clean up when done
unregister_span(span.span_id)TTL-Based Cleanup
Registered spans have a default TTL of 60 seconds and are automatically cleaned up when expired. For long-running streams, use extend_span_ttl(span_id, additional_seconds) to keep the span alive.
Custom Streaming
For custom streaming implementations, use the tracer span context manager:
from risicare import get_tracer
async def my_streaming_function():
tracer = get_tracer()
with tracer.start_span("streaming-operation") as span:
total_tokens = 0
async for chunk in some_stream():
total_tokens += count_tokens(chunk)
span.set_attribute("tokens_so_far", total_tokens)
yield chunk
span.set_attribute("total_tokens", total_tokens)Generator Functions
Decorators and Generators
Standard decorators do not work well with generator functions. Use traced_stream with the span registry instead:
from risicare import traced_stream, register_span, unregister_span, get_tracer
# Do this
async def stream_response():
tracer = get_tracer()
with tracer.start_span("llm_stream") as span:
register_span(span)
try:
async for chunk in traced_stream(span.span_id, client.stream()):
yield chunk
finally:
unregister_span(span.span_id)Streaming Span Attributes
| Attribute | Description |
|---|---|
stream.total_chunks | Total number of chunks received |
stream.total_size | Total accumulated size across all chunks |
Configuration
Disable Content Capture
risicare.init(
trace_content=False # Don't capture streaming content
)Advanced Span Registry Utilities
These functions are exported from risicare_core (not the top-level risicare package).
extend_span_ttl
Extend the TTL of a registered span. Use this for long-running streams that may exceed the default 60-second TTL.
from risicare_core import extend_span_ttl
# Returns True if span was found and TTL extended, False otherwise
extend_span_ttl(span_id: str, additional_seconds: float) -> boolfrom risicare import register_span, get_tracer
from risicare_core import extend_span_ttl
tracer = get_tracer()
with tracer.start_span("long-stream") as span:
register_span(span, ttl_seconds=60)
async for chunk in long_running_stream:
# Keep the span alive during a multi-minute stream
extend_span_ttl(span.span_id, 60)
yield chunkget_span_registry_stats
Get registry statistics for debugging span lifecycle issues.
from risicare_core import get_span_registry_stats
stats = get_span_registry_stats() -> dictReturns:
| Key | Type | Description |
|---|---|---|
total_entries | int | Total spans in registry (active + expired) |
active_entries | int | Spans that have not expired |
expired_entries | int | Spans past their TTL (awaiting cleanup) |
operation_count | int | Total register/unregister operations since startup |
JavaScript / TypeScript
The JS SDK provides tracedStream() for tracing async iterables. Unlike Python, Node.js AsyncLocalStorage propagates through async generators natively, so no span registry workaround is needed.
import { tracedStream } from 'risicare';
const stream = tracedStream(asyncIterable, { name: 'llm-stream' });
for await (const chunk of stream) {
process.stdout.write(chunk);
}
// Span automatically records stream.chunk_count and stream.completedYou can also pass a string name directly:
const stream = tracedStream(asyncIterable, 'my-stream');