Skip to main content
GitHub

Streaming

Trace streaming LLM responses.

Risicare provides utilities for tracing streaming LLM responses with proper chunk tracking and span attribution.

Why Streaming Needs Special Handling

Standard contextvars do not propagate across yield boundaries in async generators (a Python limitation, PEP 568). The streaming utilities use the span registry for ID-based lookup instead, ensuring span context is available throughout the stream.

traced_stream (Async)

Wrap async streaming responses with span-aware chunk tracking:

from risicare import traced_stream, register_span, unregister_span, get_tracer
 
async def generate():
    tracer = get_tracer()
    with tracer.start_span("llm_stream") as span:
        register_span(span)
        try:
            response = await client.chat.completions.create(
                model="gpt-4o",
                messages=[{"role": "user", "content": "Write a story"}],
                stream=True,
            )
            async for chunk in traced_stream(span.span_id, response, event_name="chunk"):
                yield chunk.choices[0].delta.content or ""
        finally:
            unregister_span(span.span_id)
 
async for text in generate():
    print(text, end="")

Parameters

ParameterTypeDefaultDescription
span_idstrrequiredThe span ID to track events under (from span.span_id)
streamAsyncIteratorrequiredThe async iterator to wrap
event_namestr"chunk"Name for chunk events recorded on the span

traced_stream_sync (Sync)

Wrap synchronous streaming responses:

from risicare import traced_stream_sync, register_span, unregister_span, get_tracer
 
tracer = get_tracer()
with tracer.start_span("llm_stream") as span:
    register_span(span)
    try:
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": "Write a story"}],
            stream=True,
        )
        for chunk in traced_stream_sync(span.span_id, response, event_name="chunk"):
            print(chunk.choices[0].delta.content or "", end="")
    finally:
        unregister_span(span.span_id)

Parameters

ParameterTypeDefaultDescription
span_idstrrequiredThe span ID to track events under
streamIteratorrequiredThe iterator to wrap
event_namestr"chunk"Name for chunk events recorded on the span

How It Works

The streaming utilities:

  1. Look up the span by ID from the span registry
  2. Track each chunk with an event on the span (chunk number and size)
  3. Record totals when the stream completes (stream.total_chunks, stream.total_size)
# Span attributes after streaming completes:
{
    "stream.total_chunks": 42,
    "stream.total_size": 1560,
}

Span Registry

The span registry allows ID-based span retrieval in contexts where contextvars do not propagate (async generators, process pools):

from risicare import register_span, get_span_by_id, unregister_span
 
# Register a span for later retrieval
register_span(span, ttl_seconds=60)
 
# Retrieve the span by ID (from any context)
same_span = get_span_by_id(span.span_id)
 
# Clean up when done
unregister_span(span.span_id)

TTL-Based Cleanup

Registered spans have a default TTL of 60 seconds and are automatically cleaned up when expired. For long-running streams, use extend_span_ttl(span_id, additional_seconds) to keep the span alive.

Custom Streaming

For custom streaming implementations, use the tracer span context manager:

from risicare import get_tracer
 
async def my_streaming_function():
    tracer = get_tracer()
    with tracer.start_span("streaming-operation") as span:
        total_tokens = 0
        async for chunk in some_stream():
            total_tokens += count_tokens(chunk)
            span.set_attribute("tokens_so_far", total_tokens)
            yield chunk
        span.set_attribute("total_tokens", total_tokens)

Generator Functions

Decorators and Generators

Standard decorators do not work well with generator functions. Use traced_stream with the span registry instead:

from risicare import traced_stream, register_span, unregister_span, get_tracer
 
# Do this
async def stream_response():
    tracer = get_tracer()
    with tracer.start_span("llm_stream") as span:
        register_span(span)
        try:
            async for chunk in traced_stream(span.span_id, client.stream()):
                yield chunk
        finally:
            unregister_span(span.span_id)

Streaming Span Attributes

AttributeDescription
stream.total_chunksTotal number of chunks received
stream.total_sizeTotal accumulated size across all chunks

Configuration

Disable Content Capture

risicare.init(
    trace_content=False  # Don't capture streaming content
)

Advanced Span Registry Utilities

These functions are exported from risicare_core (not the top-level risicare package).

extend_span_ttl

Extend the TTL of a registered span. Use this for long-running streams that may exceed the default 60-second TTL.

from risicare_core import extend_span_ttl
 
# Returns True if span was found and TTL extended, False otherwise
extend_span_ttl(span_id: str, additional_seconds: float) -> bool
from risicare import register_span, get_tracer
from risicare_core import extend_span_ttl
 
tracer = get_tracer()
with tracer.start_span("long-stream") as span:
    register_span(span, ttl_seconds=60)
    async for chunk in long_running_stream:
        # Keep the span alive during a multi-minute stream
        extend_span_ttl(span.span_id, 60)
        yield chunk

get_span_registry_stats

Get registry statistics for debugging span lifecycle issues.

from risicare_core import get_span_registry_stats
 
stats = get_span_registry_stats() -> dict

Returns:

KeyTypeDescription
total_entriesintTotal spans in registry (active + expired)
active_entriesintSpans that have not expired
expired_entriesintSpans past their TTL (awaiting cleanup)
operation_countintTotal register/unregister operations since startup

JavaScript / TypeScript

The JS SDK provides tracedStream() for tracing async iterables. Unlike Python, Node.js AsyncLocalStorage propagates through async generators natively, so no span registry workaround is needed.

import { tracedStream } from 'risicare';
 
const stream = tracedStream(asyncIterable, { name: 'llm-stream' });
for await (const chunk of stream) {
    process.stdout.write(chunk);
}
// Span automatically records stream.chunk_count and stream.completed

You can also pass a string name directly:

const stream = tracedStream(asyncIterable, 'my-stream');

Next Steps