Skip to main content
GitHub

Context Managers

Fine-grained tracing control with context managers.

Context managers provide fine-grained control over tracing scope and attributes.

Session Context

Group related traces into a user session:

from risicare import session_context
 
async def handle_request(user_id: str, request_id: str):
    with session_context(
        session_id=request_id,
        user_id=user_id,
        metadata={"source": "api"},
        parent_session_id=None,
        turn_number=1,
    ):
        # All traces here belong to this session
        await process_request()

Parameters

ParameterTypeDefaultDescription
session_idstrrequiredUnique session identifier
user_idstr | NoneNoneOptional user identifier
metadatadict | NoneNoneCustom session metadata
parent_session_idstr | NoneNoneLink to previous turn in multi-turn conversations
turn_numberint1Turn number in multi-turn conversation

Async Variant

from risicare import async_session_context
 
async def handle_request(user_id: str, request_id: str):
    async with async_session_context(
        session_id=request_id,
        user_id=user_id,
    ):
        await process_request()

Agent Context

Identify agent boundaries within a trace:

from risicare import agent_context
 
def run_planner():
    with agent_context(
        "planner-001",               # agent_id (required, positional)
        agent_name="planner",
        agent_role="coordinator",
        agent_type="custom",
        version=1,
        metadata={"model": "gpt-4o"},
    ):
        # All spans here are attributed to this agent
        plan = create_plan()
        return plan

Parameters

ParameterTypeDefaultDescription
agent_idstrrequiredUnique agent instance ID
agent_namestr | NoneNone (falls back to agent_id)Human-readable agent name
agent_rolestr | NoneNoneAgent role (orchestrator, worker, specialist)
agent_typestr | NoneNoneFramework type (langgraph, crewai, autogen, custom)
versionint | NoneNoneAgent version number
metadatadict | NoneNoneCustom agent metadata

Automatic Parent Tracking

When agent contexts are nested, the inner agent automatically records the outer agent as its parent_agent_id. You do not need to set this manually.

Async Variant

from risicare import async_agent_context
 
async def run_worker():
    async with async_agent_context(
        "worker-001",
        agent_name="worker",
        agent_role="specialist",
    ):
        await do_work()

Phase Context

Track decision phases using the SemanticPhase enum:

from risicare import phase_context
from risicare import SemanticPhase
 
def process_task(task: str):
    # Thinking phase
    with phase_context(SemanticPhase.THINK):
        analysis = analyze_task(task)
 
    # Decision phase
    with phase_context(SemanticPhase.DECIDE):
        action = select_action(analysis)
 
    # Action phase
    with phase_context(SemanticPhase.ACT):
        result = execute_action(action)
 
    return result

SemanticPhase Values

PhaseValueDescription
SemanticPhase.THINK"think"Reasoning, planning, analysis
SemanticPhase.DECIDE"decide"Decision making, action selection
SemanticPhase.ACT"act"Action execution, tool calls
SemanticPhase.REFLECT"reflect"Self-evaluation
SemanticPhase.OBSERVE"observe"Environment observation, state reading
SemanticPhase.COMMUNICATE"communicate"Inter-agent communication
SemanticPhase.COORDINATE"coordinate"Multi-agent coordination

Enum Required

phase_context accepts a SemanticPhase enum value, not a plain string. Use SemanticPhase.THINK instead of "think".

Span Context

Create custom spans for any operation:

from risicare import get_tracer
 
def complex_operation():
    tracer = get_tracer()
    with tracer.start_span(
        "data_processing",
        attributes={"batch_size": 100},
    ) as span:
        result = process_data()
        span.set_attribute("records_processed", len(result))
        return result

Parameters

ParameterTypeDescription
namestrSpan name
kindSpanKindSpan kind (e.g. SpanKind.INTERNAL, SpanKind.LLM_CALL)
attributesdict | NoneCustom span attributes

Nesting Contexts

Contexts can be nested to create rich hierarchies:

from risicare import session_context, agent_context, phase_context
from risicare import SemanticPhase
 
async def handle_user_request(user_id: str, request: str):
    with session_context(session_id=f"sess-{user_id}"):
        with agent_context("orchestrator-001", agent_name="orchestrator", agent_role="coordinator"):
            with phase_context(SemanticPhase.THINK):
                intent = await classify_intent(request)
 
            with phase_context(SemanticPhase.DECIDE):
                specialist = select_specialist(intent)
 
            with agent_context("specialist-001", agent_name=specialist, agent_role="specialist"):
                with phase_context(SemanticPhase.ACT):
                    result = await execute_task(request)
 
        return result

Async Support

All context managers have dedicated async variants:

from risicare import async_agent_context, async_session_context
 
async def async_agent():
    async with async_session_context("sess-123", user_id="u-456"):
        async with async_agent_context("worker-001", agent_name="async-worker"):
            result = await async_operation()
            return result

Thread Safety

Context managers use contextvars for thread-safe propagation. The SDK automatically patches ThreadPoolExecutor so context propagates to worker threads:

import concurrent.futures
from risicare import agent_context
 
def worker(task_id: int):
    # Context is automatically propagated from the submitting thread
    agent = get_current_agent()
    return process_task(task_id)
 
with agent_context("pool-coordinator", agent_role="coordinator"):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = [executor.submit(worker, i) for i in range(10)]
        results = [f.result() for f in futures]

Process Pool Propagation

For ProcessPoolExecutor, use get_trace_context() and restore_trace_context() to explicitly propagate context across process boundaries:

from risicare import get_trace_context, restore_trace_context
 
ctx = get_trace_context()
 
def worker(trace_ctx):
    with restore_trace_context(trace_ctx):
        do_work()
 
executor.submit(worker, ctx)

Next Steps