Skip to main content
GitHub

Streaming

Trace streaming LLM responses.

Risicare provides utilities for tracing streaming LLM responses with proper chunk tracking and span attribution.

Why Streaming Needs Special Handling

Standard contextvars do not propagate across yield boundaries in async generators (a Python limitation, PEP 568). The streaming utilities use the span registry for ID-based lookup instead, ensuring span context is available throughout the stream.

traced_stream (Async)

Wrap async streaming responses with span-aware chunk tracking:

from risicare import traced_stream, register_span, unregister_span, get_tracer
 
async def generate():
    tracer = get_tracer()
    with tracer.start_span("llm_stream") as span:
        register_span(span)
        try:
            response = await client.chat.completions.create(
                model="gpt-4o",
                messages=[{"role": "user", "content": "Write a story"}],
                stream=True,
            )
            async for chunk in traced_stream(span.span_id, response, event_name="chunk"):
                yield chunk.choices[0].delta.content or ""
        finally:
            unregister_span(span.span_id)
 
async for text in generate():
    print(text, end="")

Parameters

ParameterTypeDefaultDescription
span_idstrrequiredThe span ID to track events under (from span.span_id)
streamAsyncIteratorrequiredThe async iterator to wrap
event_namestr"chunk"Name for chunk events recorded on the span

traced_stream_sync (Sync)

Wrap synchronous streaming responses:

from risicare import traced_stream_sync, register_span, unregister_span, get_tracer
 
tracer = get_tracer()
with tracer.start_span("llm_stream") as span:
    register_span(span)
    try:
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": "Write a story"}],
            stream=True,
        )
        for chunk in traced_stream_sync(span.span_id, response, event_name="chunk"):
            print(chunk.choices[0].delta.content or "", end="")
    finally:
        unregister_span(span.span_id)

Parameters

ParameterTypeDefaultDescription
span_idstrrequiredThe span ID to track events under
streamIteratorrequiredThe iterator to wrap
event_namestr"chunk"Name for chunk events recorded on the span

How It Works

The streaming utilities:

  1. Look up the span by ID from the span registry
  2. Track each chunk with an event on the span (chunk number and size)
  3. Record totals when the stream completes (stream.total_chunks, stream.total_size)
# Span attributes after streaming completes:
{
    "stream.total_chunks": 42,
    "stream.total_size": 1560,
}

Span Registry

The span registry allows ID-based span retrieval in contexts where contextvars do not propagate (async generators, process pools):

from risicare import register_span, get_span_by_id, unregister_span
 
# Register a span for later retrieval
register_span(span, ttl_seconds=60)
 
# Retrieve the span by ID (from any context)
same_span = get_span_by_id(span.span_id)
 
# Clean up when done
unregister_span(span.span_id)

TTL-Based Cleanup

Registered spans have a default TTL of 60 seconds and are automatically cleaned up when expired. For long-running streams, use extend_span_ttl(span_id, additional_seconds) to keep the span alive.

Custom Streaming

For custom streaming implementations, use the tracer span context manager:

from risicare import get_tracer
 
async def my_streaming_function():
    tracer = get_tracer()
    with tracer.start_span("streaming-operation") as span:
        total_tokens = 0
        async for chunk in some_stream():
            total_tokens += count_tokens(chunk)
            span.set_attribute("tokens_so_far", total_tokens)
            yield chunk
        span.set_attribute("total_tokens", total_tokens)

Generator Functions

Decorators and Generators

Standard decorators do not work well with generator functions. Use traced_stream with the span registry instead:

from risicare import traced_stream, register_span, unregister_span, get_tracer
 
# Do this
async def stream_response():
    tracer = get_tracer()
    with tracer.start_span("llm_stream") as span:
        register_span(span)
        try:
            async for chunk in traced_stream(span.span_id, client.stream()):
                yield chunk
        finally:
            unregister_span(span.span_id)

Streaming Span Attributes

AttributeDescription
stream.total_chunksTotal number of chunks received
stream.total_sizeTotal accumulated size across all chunks

Configuration

Disable Content Capture

risicare.init(
    trace_content=False  # Don't capture streaming content
)

Next Steps