Skip to content

Telemetry & Retries

Production AI applications require observability for debugging, cost tracking, and performance optimization. Iris provides telemetry hooks for instrumentation and configurable retry policies for handling transient failures.

Telemetry hooks receive events at key points in the request lifecycle, enabling logging, metrics, tracing, and cost tracking.

type TelemetryHook interface {
// Called before request is sent
OnRequestStart(RequestStartEvent)
// Called after request completes (success or failure)
OnRequestEnd(RequestEndEvent)
// Called when streaming starts
OnStreamStart(StreamStartEvent)
// Called for each stream chunk
OnStreamChunk(StreamChunkEvent)
// Called when stream ends
OnStreamEnd(StreamEndEvent)
// Called when retry is attempted
OnRetry(RetryEvent)
}
type RequestStartEvent struct {
RequestID string
Provider string
Model string
Messages []Message
Tools []Tool
Temperature float64
MaxTokens int
Timestamp time.Time
}
type RequestEndEvent struct {
RequestID string
Provider string
Model string
Duration time.Duration
InputTokens int
OutputTokens int
TotalTokens int
Success bool
Error error
FinishReason string
Timestamp time.Time
}
type StreamStartEvent struct {
RequestID string
Provider string
Model string
Timestamp time.Time
}
type StreamChunkEvent struct {
RequestID string
Chunk StreamChunk
Timestamp time.Time
}
type StreamEndEvent struct {
RequestID string
Duration time.Duration
TotalChunks int
TotalTokens int
Success bool
Error error
Timestamp time.Time
}
type RetryEvent struct {
RequestID string
Attempt int
MaxAttempts int
Error error
Delay time.Duration
Timestamp time.Time
}
package main
import (
"log"
"os"
"time"
"github.com/petal-labs/iris/core"
"github.com/petal-labs/iris/providers/openai"
)
type LoggerHook struct{}
func (LoggerHook) OnRequestStart(e core.RequestStartEvent) {
log.Printf("[%s] Starting request to %s/%s",
e.RequestID[:8], e.Provider, e.Model)
}
func (LoggerHook) OnRequestEnd(e core.RequestEndEvent) {
if e.Success {
log.Printf("[%s] Completed in %s: %d tokens (in: %d, out: %d)",
e.RequestID[:8], e.Duration, e.TotalTokens,
e.InputTokens, e.OutputTokens)
} else {
log.Printf("[%s] Failed after %s: %v",
e.RequestID[:8], e.Duration, e.Error)
}
}
func (LoggerHook) OnStreamStart(e core.StreamStartEvent) {
log.Printf("[%s] Stream started", e.RequestID[:8])
}
func (LoggerHook) OnStreamChunk(e core.StreamChunkEvent) {
// Usually too noisy to log individual chunks
}
func (LoggerHook) OnStreamEnd(e core.StreamEndEvent) {
log.Printf("[%s] Stream ended: %d chunks in %s",
e.RequestID[:8], e.TotalChunks, e.Duration)
}
func (LoggerHook) OnRetry(e core.RetryEvent) {
log.Printf("[%s] Retry %d/%d after %s: %v",
e.RequestID[:8], e.Attempt, e.MaxAttempts, e.Delay, e.Error)
}
func main() {
provider := openai.New(os.Getenv("OPENAI_API_KEY"))
client := core.NewClient(provider, core.WithTelemetry(LoggerHook{}))
resp, _ := client.Chat("gpt-4o").
User("Hello!").
GetResponse(context.Background())
fmt.Println(resp.Output)
}

Register multiple hooks for different concerns:

client := core.NewClient(provider,
core.WithTelemetry(LoggerHook{}),
core.WithTelemetry(MetricsHook{}),
core.WithTelemetry(TracingHook{}),
core.WithTelemetry(CostTrackingHook{}),
)
package telemetry
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/petal-labs/iris/core"
)
var (
requestsTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "iris_requests_total",
Help: "Total number of LLM requests",
},
[]string{"provider", "model", "status"},
)
requestDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "iris_request_duration_seconds",
Help: "Request duration in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"provider", "model"},
)
tokensTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "iris_tokens_total",
Help: "Total tokens used",
},
[]string{"provider", "model", "type"},
)
retriesTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "iris_retries_total",
Help: "Total retry attempts",
},
[]string{"provider", "model"},
)
activeStreams = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "iris_active_streams",
Help: "Number of active streams",
},
[]string{"provider", "model"},
)
)
type PrometheusHook struct{}
func (PrometheusHook) OnRequestStart(e core.RequestStartEvent) {
// Track in-flight requests if needed
}
func (PrometheusHook) OnRequestEnd(e core.RequestEndEvent) {
status := "success"
if !e.Success {
status = "error"
}
requestsTotal.WithLabelValues(e.Provider, e.Model, status).Inc()
requestDuration.WithLabelValues(e.Provider, e.Model).Observe(e.Duration.Seconds())
tokensTotal.WithLabelValues(e.Provider, e.Model, "input").Add(float64(e.InputTokens))
tokensTotal.WithLabelValues(e.Provider, e.Model, "output").Add(float64(e.OutputTokens))
}
func (PrometheusHook) OnStreamStart(e core.StreamStartEvent) {
activeStreams.WithLabelValues(e.Provider, e.Model).Inc()
}
func (PrometheusHook) OnStreamChunk(e core.StreamChunkEvent) {}
func (PrometheusHook) OnStreamEnd(e core.StreamEndEvent) {
activeStreams.WithLabelValues(e.Provider, e.Model).Dec()
}
func (PrometheusHook) OnRetry(e core.RetryEvent) {
retriesTotal.WithLabelValues(e.Provider, e.Model).Inc()
}
package telemetry
import (
"context"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"github.com/petal-labs/iris/core"
)
type OTelHook struct {
tracer trace.Tracer
meter metric.Meter
counter metric.Int64Counter
latency metric.Float64Histogram
spans map[string]trace.Span
}
func NewOTelHook() *OTelHook {
tracer := otel.Tracer("iris")
meter := otel.Meter("iris")
counter, _ := meter.Int64Counter("iris.requests",
metric.WithDescription("Number of LLM requests"))
latency, _ := meter.Float64Histogram("iris.latency",
metric.WithDescription("Request latency"),
metric.WithUnit("s"))
return &OTelHook{
tracer: tracer,
meter: meter,
counter: counter,
latency: latency,
spans: make(map[string]trace.Span),
}
}
func (h *OTelHook) OnRequestStart(e core.RequestStartEvent) {
ctx := context.Background()
_, span := h.tracer.Start(ctx, "llm.request",
trace.WithAttributes(
attribute.String("provider", e.Provider),
attribute.String("model", e.Model),
attribute.Float64("temperature", e.Temperature),
attribute.Int("max_tokens", e.MaxTokens),
),
)
h.spans[e.RequestID] = span
}
func (h *OTelHook) OnRequestEnd(e core.RequestEndEvent) {
span, ok := h.spans[e.RequestID]
if !ok {
return
}
defer delete(h.spans, e.RequestID)
span.SetAttributes(
attribute.Int("tokens.input", e.InputTokens),
attribute.Int("tokens.output", e.OutputTokens),
attribute.Bool("success", e.Success),
attribute.String("finish_reason", e.FinishReason),
)
if e.Error != nil {
span.RecordError(e.Error)
}
span.End()
// Record metrics
attrs := []attribute.KeyValue{
attribute.String("provider", e.Provider),
attribute.String("model", e.Model),
}
h.counter.Add(context.Background(), 1, metric.WithAttributes(attrs...))
h.latency.Record(context.Background(), e.Duration.Seconds(), metric.WithAttributes(attrs...))
}
func (h *OTelHook) OnStreamStart(e core.StreamStartEvent) {}
func (h *OTelHook) OnStreamChunk(e core.StreamChunkEvent) {}
func (h *OTelHook) OnStreamEnd(e core.StreamEndEvent) {}
func (h *OTelHook) OnRetry(e core.RetryEvent) {}

Track API costs based on token usage:

package telemetry
import (
"sync"
"github.com/petal-labs/iris/core"
)
// Pricing per 1M tokens (example rates, check provider pricing)
var modelPricing = map[string]struct {
InputPer1M float64
OutputPer1M float64
}{
"gpt-4o": {5.00, 15.00},
"gpt-4o-mini": {0.15, 0.60},
"gpt-4-turbo": {10.00, 30.00},
"claude-3-opus": {15.00, 75.00},
"claude-3-5-sonnet": {3.00, 15.00},
"claude-3-haiku": {0.25, 1.25},
}
type CostTracker struct {
mu sync.RWMutex
totalCost float64
costByModel map[string]float64
}
func NewCostTracker() *CostTracker {
return &CostTracker{
costByModel: make(map[string]float64),
}
}
func (ct *CostTracker) OnRequestStart(e core.RequestStartEvent) {}
func (ct *CostTracker) OnRequestEnd(e core.RequestEndEvent) {
pricing, ok := modelPricing[e.Model]
if !ok {
return
}
inputCost := float64(e.InputTokens) / 1_000_000 * pricing.InputPer1M
outputCost := float64(e.OutputTokens) / 1_000_000 * pricing.OutputPer1M
totalCost := inputCost + outputCost
ct.mu.Lock()
ct.totalCost += totalCost
ct.costByModel[e.Model] += totalCost
ct.mu.Unlock()
}
func (ct *CostTracker) OnStreamStart(e core.StreamStartEvent) {}
func (ct *CostTracker) OnStreamChunk(e core.StreamChunkEvent) {}
func (ct *CostTracker) OnStreamEnd(e core.StreamEndEvent) {}
func (ct *CostTracker) OnRetry(e core.RetryEvent) {}
func (ct *CostTracker) TotalCost() float64 {
ct.mu.RLock()
defer ct.mu.RUnlock()
return ct.totalCost
}
func (ct *CostTracker) CostByModel() map[string]float64 {
ct.mu.RLock()
defer ct.mu.RUnlock()
result := make(map[string]float64)
for k, v := range ct.costByModel {
result[k] = v
}
return result
}
// Usage
func main() {
tracker := NewCostTracker()
client := core.NewClient(provider, core.WithTelemetry(tracker))
// ... make requests ...
fmt.Printf("Total cost: $%.4f\n", tracker.TotalCost())
for model, cost := range tracker.CostByModel() {
fmt.Printf(" %s: $%.4f\n", model, cost)
}
}

Handle transient failures with configurable retry strategies.

policy := core.NewRetryPolicy(core.RetryConfig{
MaxRetries: 3, // Maximum retry attempts
BaseDelay: 500 * time.Millisecond, // Initial delay
MaxDelay: 30 * time.Second, // Maximum delay cap
BackoffMultiplier: 2.0, // Exponential backoff factor
Jitter: 0.2, // Random jitter (±20%)
})
client := core.NewClient(provider, core.WithRetryPolicy(policy))
// Exponential backoff (default)
policy := core.NewRetryPolicy(core.RetryConfig{
MaxRetries: 5,
BaseDelay: 1 * time.Second,
BackoffMultiplier: 2.0, // 1s, 2s, 4s, 8s, 16s
})
// Linear backoff
policy := core.NewRetryPolicy(core.RetryConfig{
MaxRetries: 5,
BaseDelay: 2 * time.Second,
BackoffMultiplier: 1.0, // 2s, 2s, 2s, 2s, 2s
})
// Constant with jitter
policy := core.NewRetryPolicy(core.RetryConfig{
MaxRetries: 5,
BaseDelay: 1 * time.Second,
BackoffMultiplier: 1.0,
Jitter: 0.5, // 0.5s - 1.5s randomly
})

Iris automatically retries these errors:

// Retried automatically
core.ErrRateLimited // 429 Too Many Requests
core.ErrServerError // 5xx Server Errors
core.ErrTimeout // Request timeouts
core.ErrConnectionReset // Network issues
// Not retried (permanent failures)
core.ErrAuthentication // 401/403 Auth errors
core.ErrBadRequest // 400 Invalid request
core.ErrNotFound // 404 Resource not found
core.ErrContentFiltered // Safety filter triggered
policy := core.NewRetryPolicy(core.RetryConfig{
MaxRetries: 3,
BaseDelay: 1 * time.Second,
ShouldRetry: func(err error) bool {
// Custom logic to determine if error is retryable
var rateLimitErr *core.RateLimitError
if errors.As(err, &rateLimitErr) {
// Only retry if under the retry budget
return rateLimitErr.RetryAfter < 60*time.Second
}
var serverErr *core.ServerError
if errors.As(err, &serverErr) {
// Only retry 502/503, not 500
return serverErr.StatusCode == 502 || serverErr.StatusCode == 503
}
return false
},
})
policy := core.NewRetryPolicy(core.RetryConfig{
MaxRetries: 3,
BaseDelay: 1 * time.Second,
RespectRetryAfter: true, // Use server's Retry-After header when available
})

Prevent cascading failures by stopping requests when a provider is unhealthy.

package resilience
import (
"errors"
"sync"
"time"
)
type State int
const (
StateClosed State = iota // Normal operation
StateOpen // Failing, reject requests
StateHalfOpen // Testing if recovered
)
type CircuitBreaker struct {
mu sync.RWMutex
state State
failures int
successes int
lastFailure time.Time
failureThreshold int
successThreshold int
timeout time.Duration
}
func NewCircuitBreaker(failureThreshold, successThreshold int, timeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
state: StateClosed,
failureThreshold: failureThreshold,
successThreshold: successThreshold,
timeout: timeout,
}
}
var ErrCircuitOpen = errors.New("circuit breaker is open")
func (cb *CircuitBreaker) Allow() error {
cb.mu.Lock()
defer cb.mu.Unlock()
switch cb.state {
case StateClosed:
return nil
case StateOpen:
// Check if timeout has passed
if time.Since(cb.lastFailure) > cb.timeout {
cb.state = StateHalfOpen
cb.successes = 0
return nil
}
return ErrCircuitOpen
case StateHalfOpen:
return nil
}
return nil
}
func (cb *CircuitBreaker) RecordSuccess() {
cb.mu.Lock()
defer cb.mu.Unlock()
switch cb.state {
case StateClosed:
cb.failures = 0
case StateHalfOpen:
cb.successes++
if cb.successes >= cb.successThreshold {
cb.state = StateClosed
cb.failures = 0
}
}
}
func (cb *CircuitBreaker) RecordFailure() {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.lastFailure = time.Now()
switch cb.state {
case StateClosed:
cb.failures++
if cb.failures >= cb.failureThreshold {
cb.state = StateOpen
}
case StateHalfOpen:
cb.state = StateOpen
}
}
func (cb *CircuitBreaker) State() State {
cb.mu.RLock()
defer cb.mu.RUnlock()
return cb.state
}
type ResilientClient struct {
client *core.Client
breaker *CircuitBreaker
}
func NewResilientClient(provider core.Provider) *ResilientClient {
return &ResilientClient{
client: core.NewClient(provider),
breaker: NewCircuitBreaker(
5, // Open after 5 failures
3, // Close after 3 successes
30*time.Second, // Try again after 30s
),
}
}
func (rc *ResilientClient) Chat(ctx context.Context, model, prompt string) (*core.ChatResponse, error) {
if err := rc.breaker.Allow(); err != nil {
return nil, fmt.Errorf("provider unavailable: %w", err)
}
resp, err := rc.client.Chat(model).User(prompt).GetResponse(ctx)
if err != nil {
rc.breaker.RecordFailure()
return nil, err
}
rc.breaker.RecordSuccess()
return resp, nil
}
type FallbackClient struct {
primary *ResilientClient
fallback *ResilientClient
}
func (fc *FallbackClient) Chat(ctx context.Context, prompt string) (*core.ChatResponse, error) {
// Try primary
resp, err := fc.primary.Chat(ctx, "gpt-4o", prompt)
if err == nil {
return resp, nil
}
// If primary circuit is open, try fallback
if errors.Is(err, ErrCircuitOpen) {
log.Println("Primary circuit open, using fallback")
return fc.fallback.Chat(ctx, "claude-3-5-sonnet", prompt)
}
// For other errors, still try fallback
log.Printf("Primary failed: %v, trying fallback", err)
return fc.fallback.Chat(ctx, "claude-3-5-sonnet", prompt)
}

Handle different error types appropriately:

func HandleError(err error) {
// Rate limiting
var rateLimitErr *core.RateLimitError
if errors.As(err, &rateLimitErr) {
log.Printf("Rate limited, retry after %v", rateLimitErr.RetryAfter)
time.Sleep(rateLimitErr.RetryAfter)
return
}
// Authentication errors
var authErr *core.AuthenticationError
if errors.As(err, &authErr) {
log.Fatal("Invalid API key, please check configuration")
}
// Content filtering
var filterErr *core.ContentFilterError
if errors.As(err, &filterErr) {
log.Printf("Content filtered: %s", filterErr.Reason)
// Don't retry, modify the prompt
return
}
// Context length exceeded
var contextErr *core.ContextLengthError
if errors.As(err, &contextErr) {
log.Printf("Input too long, max: %d tokens", contextErr.MaxTokens)
// Truncate input and retry
return
}
// Model not found
var modelErr *core.ModelNotFoundError
if errors.As(err, &modelErr) {
log.Printf("Model %s not available", modelErr.Model)
// Try a different model
return
}
// Server errors
var serverErr *core.ServerError
if errors.As(err, &serverErr) {
log.Printf("Server error %d: %s", serverErr.StatusCode, serverErr.Message)
// Automatic retry should handle this
return
}
// Timeout
if errors.Is(err, context.DeadlineExceeded) {
log.Println("Request timed out")
return
}
// Unknown error
log.Printf("Unknown error: %v", err)
}

Implement structured logging for production:

package telemetry
import (
"encoding/json"
"os"
"github.com/petal-labs/iris/core"
)
type StructuredLogHook struct {
encoder *json.Encoder
}
func NewStructuredLogHook() *StructuredLogHook {
return &StructuredLogHook{
encoder: json.NewEncoder(os.Stdout),
}
}
func (h *StructuredLogHook) OnRequestStart(e core.RequestStartEvent) {
h.encoder.Encode(map[string]any{
"event": "request.start",
"request_id": e.RequestID,
"provider": e.Provider,
"model": e.Model,
"timestamp": e.Timestamp,
})
}
func (h *StructuredLogHook) OnRequestEnd(e core.RequestEndEvent) {
entry := map[string]any{
"event": "request.end",
"request_id": e.RequestID,
"provider": e.Provider,
"model": e.Model,
"duration_ms": e.Duration.Milliseconds(),
"input_tokens": e.InputTokens,
"output_tokens": e.OutputTokens,
"success": e.Success,
"timestamp": e.Timestamp,
}
if e.Error != nil {
entry["error"] = e.Error.Error()
}
h.encoder.Encode(entry)
}
func (h *StructuredLogHook) OnStreamStart(e core.StreamStartEvent) {
h.encoder.Encode(map[string]any{
"event": "stream.start",
"request_id": e.RequestID,
"provider": e.Provider,
"model": e.Model,
"timestamp": e.Timestamp,
})
}
func (h *StructuredLogHook) OnStreamChunk(e core.StreamChunkEvent) {
// Skip to avoid log spam
}
func (h *StructuredLogHook) OnStreamEnd(e core.StreamEndEvent) {
h.encoder.Encode(map[string]any{
"event": "stream.end",
"request_id": e.RequestID,
"duration_ms": e.Duration.Milliseconds(),
"total_chunks": e.TotalChunks,
"total_tokens": e.TotalTokens,
"success": e.Success,
"timestamp": e.Timestamp,
})
}
func (h *StructuredLogHook) OnRetry(e core.RetryEvent) {
h.encoder.Encode(map[string]any{
"event": "request.retry",
"request_id": e.RequestID,
"attempt": e.Attempt,
"max_attempts": e.MaxAttempts,
"delay_ms": e.Delay.Milliseconds(),
"error": e.Error.Error(),
"timestamp": e.Timestamp,
})
}
// Development
client := core.NewClient(provider)
// Production
client := core.NewClient(provider,
core.WithTelemetry(NewPrometheusHook()),
core.WithTelemetry(NewStructuredLogHook()),
core.WithRetryPolicy(retryPolicy),
)
// Too aggressive - may cause issues
policy := core.NewRetryPolicy(core.RetryConfig{
MaxRetries: 10, // Too many retries
BaseDelay: 100 * time.Millisecond, // Too fast
})
// Better - reasonable limits
policy := core.NewRetryPolicy(core.RetryConfig{
MaxRetries: 3,
BaseDelay: 1 * time.Second,
MaxDelay: 30 * time.Second,
BackoffMultiplier: 2.0,
Jitter: 0.2,
})

Track these metrics in production:

  • Request rate: Requests per second by provider/model
  • Error rate: Failure percentage by error type
  • Latency: p50, p95, p99 response times
  • Token usage: Input/output tokens by model
  • Cost: Estimated cost by model
  • Retry rate: Retry attempts per request
  • Circuit breaker state: Health of each provider
// Always set timeouts
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
resp, err := client.Chat(model).User(prompt).GetResponse(ctx)
func NewProductionClient(provider core.Provider) *core.Client {
return core.NewClient(provider,
// Telemetry for observability
core.WithTelemetry(NewPrometheusHook()),
core.WithTelemetry(NewStructuredLogHook()),
// Retry for transient failures
core.WithRetryPolicy(core.NewRetryPolicy(core.RetryConfig{
MaxRetries: 3,
BaseDelay: 1 * time.Second,
MaxDelay: 30 * time.Second,
BackoffMultiplier: 2.0,
RespectRetryAfter: true,
})),
// Request timeout
core.WithTimeout(60*time.Second),
)
}

Providers Guide

Provider-specific telemetry features. Providers →

Tools Guide

Instrument tool calling. Tools →