Context Managers
Fine-grained tracing control with context managers.
Context managers provide fine-grained control over tracing scope and attributes.
Session Context
Group related traces into a user session:
from risicare import session_context
async def handle_request(user_id: str, request_id: str):
with session_context(
session_id=request_id,
user_id=user_id,
metadata={"source": "api"},
parent_session_id=None,
turn_number=1,
):
# All traces here belong to this session
await process_request()Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
session_id | str | required | Unique session identifier |
user_id | str | None | None | Optional user identifier |
metadata | dict | None | None | Custom session metadata |
parent_session_id | str | None | None | Link to previous turn in multi-turn conversations |
turn_number | int | 1 | Turn number in multi-turn conversation |
Async Variant
from risicare import async_session_context
async def handle_request(user_id: str, request_id: str):
async with async_session_context(
session_id=request_id,
user_id=user_id,
):
await process_request()Agent Context
Identify agent boundaries within a trace:
from risicare import agent_context
def run_planner():
with agent_context(
"planner-001", # agent_id (required, positional)
agent_name="planner",
agent_role="coordinator",
agent_type="custom",
version=1,
metadata={"model": "gpt-4o"},
):
# All spans here are attributed to this agent
plan = create_plan()
return planParameters
| Parameter | Type | Default | Description |
|---|---|---|---|
agent_id | str | required | Unique agent instance ID |
agent_name | str | None | None (falls back to agent_id) | Human-readable agent name |
agent_role | str | None | None | Agent role (orchestrator, worker, specialist) |
agent_type | str | None | None | Framework type (langgraph, crewai, autogen, custom) |
version | int | None | None | Agent version number |
metadata | dict | None | None | Custom agent metadata |
Automatic Parent Tracking
When agent contexts are nested, the inner agent automatically records the outer agent as its parent_agent_id. You do not need to set this manually.
Async Variant
from risicare import async_agent_context
async def run_worker():
async with async_agent_context(
"worker-001",
agent_name="worker",
agent_role="specialist",
):
await do_work()Phase Context
Track decision phases using the SemanticPhase enum:
from risicare import phase_context
from risicare import SemanticPhase
def process_task(task: str):
# Thinking phase
with phase_context(SemanticPhase.THINK):
analysis = analyze_task(task)
# Decision phase
with phase_context(SemanticPhase.DECIDE):
action = select_action(analysis)
# Action phase
with phase_context(SemanticPhase.ACT):
result = execute_action(action)
return resultSemanticPhase Values
| Phase | Value | Description |
|---|---|---|
SemanticPhase.THINK | "think" | Reasoning, planning, analysis |
SemanticPhase.DECIDE | "decide" | Decision making, action selection |
SemanticPhase.ACT | "act" | Action execution, tool calls |
SemanticPhase.REFLECT | "reflect" | Self-evaluation |
SemanticPhase.OBSERVE | "observe" | Environment observation, state reading |
SemanticPhase.COMMUNICATE | "communicate" | Inter-agent communication |
SemanticPhase.COORDINATE | "coordinate" | Multi-agent coordination |
Enum Required
phase_context accepts a SemanticPhase enum value, not a plain string. Use SemanticPhase.THINK instead of "think".
Span Context
Create custom spans for any operation:
from risicare import get_tracer
def complex_operation():
tracer = get_tracer()
with tracer.start_span(
"data_processing",
attributes={"batch_size": 100},
) as span:
result = process_data()
span.set_attribute("records_processed", len(result))
return resultParameters
| Parameter | Type | Description |
|---|---|---|
name | str | Span name |
kind | SpanKind | Span kind (e.g. SpanKind.INTERNAL, SpanKind.LLM_CALL) |
attributes | dict | None | Custom span attributes |
Nesting Contexts
Contexts can be nested to create rich hierarchies:
from risicare import session_context, agent_context, phase_context
from risicare import SemanticPhase
async def handle_user_request(user_id: str, request: str):
with session_context(session_id=f"sess-{user_id}"):
with agent_context("orchestrator-001", agent_name="orchestrator", agent_role="coordinator"):
with phase_context(SemanticPhase.THINK):
intent = await classify_intent(request)
with phase_context(SemanticPhase.DECIDE):
specialist = select_specialist(intent)
with agent_context("specialist-001", agent_name=specialist, agent_role="specialist"):
with phase_context(SemanticPhase.ACT):
result = await execute_task(request)
return resultAsync Support
All context managers have dedicated async variants:
from risicare import async_agent_context, async_session_context
async def async_agent():
async with async_session_context("sess-123", user_id="u-456"):
async with async_agent_context("worker-001", agent_name="async-worker"):
result = await async_operation()
return resultThread Safety
Context managers use contextvars for thread-safe propagation. The SDK automatically patches ThreadPoolExecutor so context propagates to worker threads:
import concurrent.futures
from risicare import agent_context
def worker(task_id: int):
# Context is automatically propagated from the submitting thread
agent = get_current_agent()
return process_task(task_id)
with agent_context("pool-coordinator", agent_role="coordinator"):
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(worker, i) for i in range(10)]
results = [f.result() for f in futures]Process Pool Propagation
For ProcessPoolExecutor, use get_trace_context() and restore_trace_context() to explicitly propagate context across process boundaries:
from risicare import get_trace_context, restore_trace_context
ctx = get_trace_context()
def worker(trace_ctx):
with restore_trace_context(trace_ctx):
do_work()
executor.submit(worker, ctx)