Providers Guide
Provider-specific telemetry features. Providers →
Production AI applications require observability for debugging, cost tracking, and performance optimization. Iris provides telemetry hooks for instrumentation and configurable retry policies for handling transient failures.
Telemetry hooks receive events at key points in the request lifecycle, enabling logging, metrics, tracing, and cost tracking.
type TelemetryHook interface { // Called before request is sent OnRequestStart(RequestStartEvent)
// Called after request completes (success or failure) OnRequestEnd(RequestEndEvent)
// Called when streaming starts OnStreamStart(StreamStartEvent)
// Called for each stream chunk OnStreamChunk(StreamChunkEvent)
// Called when stream ends OnStreamEnd(StreamEndEvent)
// Called when retry is attempted OnRetry(RetryEvent)}type RequestStartEvent struct { RequestID string Provider string Model string Messages []Message Tools []Tool Temperature float64 MaxTokens int Timestamp time.Time}
type RequestEndEvent struct { RequestID string Provider string Model string Duration time.Duration InputTokens int OutputTokens int TotalTokens int Success bool Error error FinishReason string Timestamp time.Time}
type StreamStartEvent struct { RequestID string Provider string Model string Timestamp time.Time}
type StreamChunkEvent struct { RequestID string Chunk StreamChunk Timestamp time.Time}
type StreamEndEvent struct { RequestID string Duration time.Duration TotalChunks int TotalTokens int Success bool Error error Timestamp time.Time}
type RetryEvent struct { RequestID string Attempt int MaxAttempts int Error error Delay time.Duration Timestamp time.Time}package main
import ( "log" "os" "time"
"github.com/petal-labs/iris/core" "github.com/petal-labs/iris/providers/openai")
type LoggerHook struct{}
func (LoggerHook) OnRequestStart(e core.RequestStartEvent) { log.Printf("[%s] Starting request to %s/%s", e.RequestID[:8], e.Provider, e.Model)}
func (LoggerHook) OnRequestEnd(e core.RequestEndEvent) { if e.Success { log.Printf("[%s] Completed in %s: %d tokens (in: %d, out: %d)", e.RequestID[:8], e.Duration, e.TotalTokens, e.InputTokens, e.OutputTokens) } else { log.Printf("[%s] Failed after %s: %v", e.RequestID[:8], e.Duration, e.Error) }}
func (LoggerHook) OnStreamStart(e core.StreamStartEvent) { log.Printf("[%s] Stream started", e.RequestID[:8])}
func (LoggerHook) OnStreamChunk(e core.StreamChunkEvent) { // Usually too noisy to log individual chunks}
func (LoggerHook) OnStreamEnd(e core.StreamEndEvent) { log.Printf("[%s] Stream ended: %d chunks in %s", e.RequestID[:8], e.TotalChunks, e.Duration)}
func (LoggerHook) OnRetry(e core.RetryEvent) { log.Printf("[%s] Retry %d/%d after %s: %v", e.RequestID[:8], e.Attempt, e.MaxAttempts, e.Delay, e.Error)}
func main() { provider := openai.New(os.Getenv("OPENAI_API_KEY")) client := core.NewClient(provider, core.WithTelemetry(LoggerHook{}))
resp, _ := client.Chat("gpt-4o"). User("Hello!"). GetResponse(context.Background())
fmt.Println(resp.Output)}Register multiple hooks for different concerns:
client := core.NewClient(provider, core.WithTelemetry(LoggerHook{}), core.WithTelemetry(MetricsHook{}), core.WithTelemetry(TracingHook{}), core.WithTelemetry(CostTrackingHook{}),)package telemetry
import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/petal-labs/iris/core")
var ( requestsTotal = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "iris_requests_total", Help: "Total number of LLM requests", }, []string{"provider", "model", "status"}, )
requestDuration = promauto.NewHistogramVec( prometheus.HistogramOpts{ Name: "iris_request_duration_seconds", Help: "Request duration in seconds", Buckets: prometheus.DefBuckets, }, []string{"provider", "model"}, )
tokensTotal = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "iris_tokens_total", Help: "Total tokens used", }, []string{"provider", "model", "type"}, )
retriesTotal = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "iris_retries_total", Help: "Total retry attempts", }, []string{"provider", "model"}, )
activeStreams = promauto.NewGaugeVec( prometheus.GaugeOpts{ Name: "iris_active_streams", Help: "Number of active streams", }, []string{"provider", "model"}, ))
type PrometheusHook struct{}
func (PrometheusHook) OnRequestStart(e core.RequestStartEvent) { // Track in-flight requests if needed}
func (PrometheusHook) OnRequestEnd(e core.RequestEndEvent) { status := "success" if !e.Success { status = "error" }
requestsTotal.WithLabelValues(e.Provider, e.Model, status).Inc() requestDuration.WithLabelValues(e.Provider, e.Model).Observe(e.Duration.Seconds()) tokensTotal.WithLabelValues(e.Provider, e.Model, "input").Add(float64(e.InputTokens)) tokensTotal.WithLabelValues(e.Provider, e.Model, "output").Add(float64(e.OutputTokens))}
func (PrometheusHook) OnStreamStart(e core.StreamStartEvent) { activeStreams.WithLabelValues(e.Provider, e.Model).Inc()}
func (PrometheusHook) OnStreamChunk(e core.StreamChunkEvent) {}
func (PrometheusHook) OnStreamEnd(e core.StreamEndEvent) { activeStreams.WithLabelValues(e.Provider, e.Model).Dec()}
func (PrometheusHook) OnRetry(e core.RetryEvent) { retriesTotal.WithLabelValues(e.Provider, e.Model).Inc()}package telemetry
import ( "context"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" "github.com/petal-labs/iris/core")
type OTelHook struct { tracer trace.Tracer meter metric.Meter counter metric.Int64Counter latency metric.Float64Histogram spans map[string]trace.Span}
func NewOTelHook() *OTelHook { tracer := otel.Tracer("iris") meter := otel.Meter("iris")
counter, _ := meter.Int64Counter("iris.requests", metric.WithDescription("Number of LLM requests"))
latency, _ := meter.Float64Histogram("iris.latency", metric.WithDescription("Request latency"), metric.WithUnit("s"))
return &OTelHook{ tracer: tracer, meter: meter, counter: counter, latency: latency, spans: make(map[string]trace.Span), }}
func (h *OTelHook) OnRequestStart(e core.RequestStartEvent) { ctx := context.Background() _, span := h.tracer.Start(ctx, "llm.request", trace.WithAttributes( attribute.String("provider", e.Provider), attribute.String("model", e.Model), attribute.Float64("temperature", e.Temperature), attribute.Int("max_tokens", e.MaxTokens), ), ) h.spans[e.RequestID] = span}
func (h *OTelHook) OnRequestEnd(e core.RequestEndEvent) { span, ok := h.spans[e.RequestID] if !ok { return } defer delete(h.spans, e.RequestID)
span.SetAttributes( attribute.Int("tokens.input", e.InputTokens), attribute.Int("tokens.output", e.OutputTokens), attribute.Bool("success", e.Success), attribute.String("finish_reason", e.FinishReason), )
if e.Error != nil { span.RecordError(e.Error) }
span.End()
// Record metrics attrs := []attribute.KeyValue{ attribute.String("provider", e.Provider), attribute.String("model", e.Model), }
h.counter.Add(context.Background(), 1, metric.WithAttributes(attrs...)) h.latency.Record(context.Background(), e.Duration.Seconds(), metric.WithAttributes(attrs...))}
func (h *OTelHook) OnStreamStart(e core.StreamStartEvent) {}func (h *OTelHook) OnStreamChunk(e core.StreamChunkEvent) {}func (h *OTelHook) OnStreamEnd(e core.StreamEndEvent) {}func (h *OTelHook) OnRetry(e core.RetryEvent) {}Track API costs based on token usage:
package telemetry
import ( "sync"
"github.com/petal-labs/iris/core")
// Pricing per 1M tokens (example rates, check provider pricing)var modelPricing = map[string]struct { InputPer1M float64 OutputPer1M float64}{ "gpt-4o": {5.00, 15.00}, "gpt-4o-mini": {0.15, 0.60}, "gpt-4-turbo": {10.00, 30.00}, "claude-3-opus": {15.00, 75.00}, "claude-3-5-sonnet": {3.00, 15.00}, "claude-3-haiku": {0.25, 1.25},}
type CostTracker struct { mu sync.RWMutex totalCost float64 costByModel map[string]float64}
func NewCostTracker() *CostTracker { return &CostTracker{ costByModel: make(map[string]float64), }}
func (ct *CostTracker) OnRequestStart(e core.RequestStartEvent) {}
func (ct *CostTracker) OnRequestEnd(e core.RequestEndEvent) { pricing, ok := modelPricing[e.Model] if !ok { return }
inputCost := float64(e.InputTokens) / 1_000_000 * pricing.InputPer1M outputCost := float64(e.OutputTokens) / 1_000_000 * pricing.OutputPer1M totalCost := inputCost + outputCost
ct.mu.Lock() ct.totalCost += totalCost ct.costByModel[e.Model] += totalCost ct.mu.Unlock()}
func (ct *CostTracker) OnStreamStart(e core.StreamStartEvent) {}func (ct *CostTracker) OnStreamChunk(e core.StreamChunkEvent) {}func (ct *CostTracker) OnStreamEnd(e core.StreamEndEvent) {}func (ct *CostTracker) OnRetry(e core.RetryEvent) {}
func (ct *CostTracker) TotalCost() float64 { ct.mu.RLock() defer ct.mu.RUnlock() return ct.totalCost}
func (ct *CostTracker) CostByModel() map[string]float64 { ct.mu.RLock() defer ct.mu.RUnlock()
result := make(map[string]float64) for k, v := range ct.costByModel { result[k] = v } return result}
// Usagefunc main() { tracker := NewCostTracker() client := core.NewClient(provider, core.WithTelemetry(tracker))
// ... make requests ...
fmt.Printf("Total cost: $%.4f\n", tracker.TotalCost()) for model, cost := range tracker.CostByModel() { fmt.Printf(" %s: $%.4f\n", model, cost) }}Handle transient failures with configurable retry strategies.
policy := core.NewRetryPolicy(core.RetryConfig{ MaxRetries: 3, // Maximum retry attempts BaseDelay: 500 * time.Millisecond, // Initial delay MaxDelay: 30 * time.Second, // Maximum delay cap BackoffMultiplier: 2.0, // Exponential backoff factor Jitter: 0.2, // Random jitter (±20%)})
client := core.NewClient(provider, core.WithRetryPolicy(policy))// Exponential backoff (default)policy := core.NewRetryPolicy(core.RetryConfig{ MaxRetries: 5, BaseDelay: 1 * time.Second, BackoffMultiplier: 2.0, // 1s, 2s, 4s, 8s, 16s})
// Linear backoffpolicy := core.NewRetryPolicy(core.RetryConfig{ MaxRetries: 5, BaseDelay: 2 * time.Second, BackoffMultiplier: 1.0, // 2s, 2s, 2s, 2s, 2s})
// Constant with jitterpolicy := core.NewRetryPolicy(core.RetryConfig{ MaxRetries: 5, BaseDelay: 1 * time.Second, BackoffMultiplier: 1.0, Jitter: 0.5, // 0.5s - 1.5s randomly})Iris automatically retries these errors:
// Retried automaticallycore.ErrRateLimited // 429 Too Many Requestscore.ErrServerError // 5xx Server Errorscore.ErrTimeout // Request timeoutscore.ErrConnectionReset // Network issues
// Not retried (permanent failures)core.ErrAuthentication // 401/403 Auth errorscore.ErrBadRequest // 400 Invalid requestcore.ErrNotFound // 404 Resource not foundcore.ErrContentFiltered // Safety filter triggeredpolicy := core.NewRetryPolicy(core.RetryConfig{ MaxRetries: 3, BaseDelay: 1 * time.Second, ShouldRetry: func(err error) bool { // Custom logic to determine if error is retryable var rateLimitErr *core.RateLimitError if errors.As(err, &rateLimitErr) { // Only retry if under the retry budget return rateLimitErr.RetryAfter < 60*time.Second }
var serverErr *core.ServerError if errors.As(err, &serverErr) { // Only retry 502/503, not 500 return serverErr.StatusCode == 502 || serverErr.StatusCode == 503 }
return false },})policy := core.NewRetryPolicy(core.RetryConfig{ MaxRetries: 3, BaseDelay: 1 * time.Second, RespectRetryAfter: true, // Use server's Retry-After header when available})Prevent cascading failures by stopping requests when a provider is unhealthy.
package resilience
import ( "errors" "sync" "time")
type State int
const ( StateClosed State = iota // Normal operation StateOpen // Failing, reject requests StateHalfOpen // Testing if recovered)
type CircuitBreaker struct { mu sync.RWMutex state State failures int successes int lastFailure time.Time failureThreshold int successThreshold int timeout time.Duration}
func NewCircuitBreaker(failureThreshold, successThreshold int, timeout time.Duration) *CircuitBreaker { return &CircuitBreaker{ state: StateClosed, failureThreshold: failureThreshold, successThreshold: successThreshold, timeout: timeout, }}
var ErrCircuitOpen = errors.New("circuit breaker is open")
func (cb *CircuitBreaker) Allow() error { cb.mu.Lock() defer cb.mu.Unlock()
switch cb.state { case StateClosed: return nil
case StateOpen: // Check if timeout has passed if time.Since(cb.lastFailure) > cb.timeout { cb.state = StateHalfOpen cb.successes = 0 return nil } return ErrCircuitOpen
case StateHalfOpen: return nil }
return nil}
func (cb *CircuitBreaker) RecordSuccess() { cb.mu.Lock() defer cb.mu.Unlock()
switch cb.state { case StateClosed: cb.failures = 0
case StateHalfOpen: cb.successes++ if cb.successes >= cb.successThreshold { cb.state = StateClosed cb.failures = 0 } }}
func (cb *CircuitBreaker) RecordFailure() { cb.mu.Lock() defer cb.mu.Unlock()
cb.lastFailure = time.Now()
switch cb.state { case StateClosed: cb.failures++ if cb.failures >= cb.failureThreshold { cb.state = StateOpen }
case StateHalfOpen: cb.state = StateOpen }}
func (cb *CircuitBreaker) State() State { cb.mu.RLock() defer cb.mu.RUnlock() return cb.state}type ResilientClient struct { client *core.Client breaker *CircuitBreaker}
func NewResilientClient(provider core.Provider) *ResilientClient { return &ResilientClient{ client: core.NewClient(provider), breaker: NewCircuitBreaker( 5, // Open after 5 failures 3, // Close after 3 successes 30*time.Second, // Try again after 30s ), }}
func (rc *ResilientClient) Chat(ctx context.Context, model, prompt string) (*core.ChatResponse, error) { if err := rc.breaker.Allow(); err != nil { return nil, fmt.Errorf("provider unavailable: %w", err) }
resp, err := rc.client.Chat(model).User(prompt).GetResponse(ctx)
if err != nil { rc.breaker.RecordFailure() return nil, err }
rc.breaker.RecordSuccess() return resp, nil}type FallbackClient struct { primary *ResilientClient fallback *ResilientClient}
func (fc *FallbackClient) Chat(ctx context.Context, prompt string) (*core.ChatResponse, error) { // Try primary resp, err := fc.primary.Chat(ctx, "gpt-4o", prompt) if err == nil { return resp, nil }
// If primary circuit is open, try fallback if errors.Is(err, ErrCircuitOpen) { log.Println("Primary circuit open, using fallback") return fc.fallback.Chat(ctx, "claude-3-5-sonnet", prompt) }
// For other errors, still try fallback log.Printf("Primary failed: %v, trying fallback", err) return fc.fallback.Chat(ctx, "claude-3-5-sonnet", prompt)}Handle different error types appropriately:
func HandleError(err error) { // Rate limiting var rateLimitErr *core.RateLimitError if errors.As(err, &rateLimitErr) { log.Printf("Rate limited, retry after %v", rateLimitErr.RetryAfter) time.Sleep(rateLimitErr.RetryAfter) return }
// Authentication errors var authErr *core.AuthenticationError if errors.As(err, &authErr) { log.Fatal("Invalid API key, please check configuration") }
// Content filtering var filterErr *core.ContentFilterError if errors.As(err, &filterErr) { log.Printf("Content filtered: %s", filterErr.Reason) // Don't retry, modify the prompt return }
// Context length exceeded var contextErr *core.ContextLengthError if errors.As(err, &contextErr) { log.Printf("Input too long, max: %d tokens", contextErr.MaxTokens) // Truncate input and retry return }
// Model not found var modelErr *core.ModelNotFoundError if errors.As(err, &modelErr) { log.Printf("Model %s not available", modelErr.Model) // Try a different model return }
// Server errors var serverErr *core.ServerError if errors.As(err, &serverErr) { log.Printf("Server error %d: %s", serverErr.StatusCode, serverErr.Message) // Automatic retry should handle this return }
// Timeout if errors.Is(err, context.DeadlineExceeded) { log.Println("Request timed out") return }
// Unknown error log.Printf("Unknown error: %v", err)}Implement structured logging for production:
package telemetry
import ( "encoding/json" "os"
"github.com/petal-labs/iris/core")
type StructuredLogHook struct { encoder *json.Encoder}
func NewStructuredLogHook() *StructuredLogHook { return &StructuredLogHook{ encoder: json.NewEncoder(os.Stdout), }}
func (h *StructuredLogHook) OnRequestStart(e core.RequestStartEvent) { h.encoder.Encode(map[string]any{ "event": "request.start", "request_id": e.RequestID, "provider": e.Provider, "model": e.Model, "timestamp": e.Timestamp, })}
func (h *StructuredLogHook) OnRequestEnd(e core.RequestEndEvent) { entry := map[string]any{ "event": "request.end", "request_id": e.RequestID, "provider": e.Provider, "model": e.Model, "duration_ms": e.Duration.Milliseconds(), "input_tokens": e.InputTokens, "output_tokens": e.OutputTokens, "success": e.Success, "timestamp": e.Timestamp, }
if e.Error != nil { entry["error"] = e.Error.Error() }
h.encoder.Encode(entry)}
func (h *StructuredLogHook) OnStreamStart(e core.StreamStartEvent) { h.encoder.Encode(map[string]any{ "event": "stream.start", "request_id": e.RequestID, "provider": e.Provider, "model": e.Model, "timestamp": e.Timestamp, })}
func (h *StructuredLogHook) OnStreamChunk(e core.StreamChunkEvent) { // Skip to avoid log spam}
func (h *StructuredLogHook) OnStreamEnd(e core.StreamEndEvent) { h.encoder.Encode(map[string]any{ "event": "stream.end", "request_id": e.RequestID, "duration_ms": e.Duration.Milliseconds(), "total_chunks": e.TotalChunks, "total_tokens": e.TotalTokens, "success": e.Success, "timestamp": e.Timestamp, })}
func (h *StructuredLogHook) OnRetry(e core.RetryEvent) { h.encoder.Encode(map[string]any{ "event": "request.retry", "request_id": e.RequestID, "attempt": e.Attempt, "max_attempts": e.MaxAttempts, "delay_ms": e.Delay.Milliseconds(), "error": e.Error.Error(), "timestamp": e.Timestamp, })}// Developmentclient := core.NewClient(provider)
// Productionclient := core.NewClient(provider, core.WithTelemetry(NewPrometheusHook()), core.WithTelemetry(NewStructuredLogHook()), core.WithRetryPolicy(retryPolicy),)// Too aggressive - may cause issuespolicy := core.NewRetryPolicy(core.RetryConfig{ MaxRetries: 10, // Too many retries BaseDelay: 100 * time.Millisecond, // Too fast})
// Better - reasonable limitspolicy := core.NewRetryPolicy(core.RetryConfig{ MaxRetries: 3, BaseDelay: 1 * time.Second, MaxDelay: 30 * time.Second, BackoffMultiplier: 2.0, Jitter: 0.2,})Track these metrics in production:
// Always set timeoutsctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)defer cancel()
resp, err := client.Chat(model).User(prompt).GetResponse(ctx)func NewProductionClient(provider core.Provider) *core.Client { return core.NewClient(provider, // Telemetry for observability core.WithTelemetry(NewPrometheusHook()), core.WithTelemetry(NewStructuredLogHook()),
// Retry for transient failures core.WithRetryPolicy(core.NewRetryPolicy(core.RetryConfig{ MaxRetries: 3, BaseDelay: 1 * time.Second, MaxDelay: 30 * time.Second, BackoffMultiplier: 2.0, RespectRetryAfter: true, })),
// Request timeout core.WithTimeout(60*time.Second), )}Providers Guide
Provider-specific telemetry features. Providers →
Tools Guide
Instrument tool calling. Tools →
Streaming Guide
Monitor streaming performance. Streaming →
Examples
See observability examples. Examples →