Observability
Observability
Section titled “Observability”This example demonstrates comprehensive observability for PetalFlow workflows including structured logging, metrics collection, distributed tracing, and audit logging.
What You’ll Build
Section titled “What You’ll Build”A fully instrumented workflow with:
- Structured logging with correlation IDs
- Prometheus metrics for latency, throughput, and errors
- OpenTelemetry tracing for distributed trace context
- Audit logging for compliance and debugging
- Real-time dashboards via event streaming
Complete Implementation
Section titled “Complete Implementation”Setup and Imports
Section titled “Setup and Imports”package main
import ( "context" "encoding/json" "fmt" "log/slog" "os" "sync" "time"
"github.com/petal-labs/iris/providers/openai" "github.com/petal-labs/petalflow" "github.com/petal-labs/petalflow/irisadapter" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace")
func main() { // Initialize observability logger := setupLogger() metrics := setupMetrics() tracer := setupTracer()
provider := openai.New(os.Getenv("OPENAI_API_KEY")) client := irisadapter.NewProviderAdapter(provider)
graph := buildSampleGraph(client)
// Create composite observer observer := NewCompositeObserver(logger, metrics, tracer)
runObservedWorkflow(graph, observer)}Structured Logging
Section titled “Structured Logging”func setupLogger() *slog.Logger { opts := &slog.HandlerOptions{ Level: slog.LevelDebug, ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr { // Customize timestamp format if a.Key == slog.TimeKey { a.Value = slog.StringValue(a.Value.Time().Format(time.RFC3339Nano)) } return a }, }
handler := slog.NewJSONHandler(os.Stdout, opts) return slog.New(handler)}
// LoggingEventHandler creates a structured logging handlertype LoggingEventHandler struct { logger *slog.Logger}
func NewLoggingEventHandler(logger *slog.Logger) *LoggingEventHandler { return &LoggingEventHandler{logger: logger}}
func (h *LoggingEventHandler) Handle(event petalflow.Event) { attrs := []slog.Attr{ slog.String("event_kind", string(event.Kind)), slog.String("node_id", event.NodeID), slog.Time("timestamp", event.Timestamp), }
// Add event-specific attributes if event.Duration > 0 { attrs = append(attrs, slog.Duration("duration", event.Duration)) }
for k, v := range event.Data { attrs = append(attrs, slog.Any(k, v)) }
// Get trace context if available if traceID, ok := event.Data["trace_id"].(string); ok { attrs = append(attrs, slog.String("trace_id", traceID)) }
switch event.Kind { case petalflow.EventGraphStart: h.logger.LogAttrs(context.Background(), slog.LevelInfo, "graph execution started", attrs...)
case petalflow.EventGraphEnd: h.logger.LogAttrs(context.Background(), slog.LevelInfo, "graph execution completed", attrs...)
case petalflow.EventNodeStart: h.logger.LogAttrs(context.Background(), slog.LevelDebug, "node started", attrs...)
case petalflow.EventNodeEnd: h.logger.LogAttrs(context.Background(), slog.LevelInfo, "node completed", attrs...)
case petalflow.EventNodeError: attrs = append(attrs, slog.String("error", event.Error.Error())) h.logger.LogAttrs(context.Background(), slog.LevelError, "node failed", attrs...)
case petalflow.EventRouteDecision: h.logger.LogAttrs(context.Background(), slog.LevelInfo, "route decision made", attrs...)
case petalflow.EventCacheHit: h.logger.LogAttrs(context.Background(), slog.LevelDebug, "cache hit", attrs...)
case petalflow.EventCacheMiss: h.logger.LogAttrs(context.Background(), slog.LevelDebug, "cache miss", attrs...)
default: h.logger.LogAttrs(context.Background(), slog.LevelDebug, "workflow event", attrs...) }}Prometheus Metrics
Section titled “Prometheus Metrics”// MetricsCollector holds Prometheus metricstype MetricsCollector struct { graphExecutions *prometheus.CounterVec graphDuration *prometheus.HistogramVec nodeExecutions *prometheus.CounterVec nodeDuration *prometheus.HistogramVec nodeErrors *prometheus.CounterVec routeDecisions *prometheus.CounterVec cacheHits *prometheus.CounterVec cacheMisses *prometheus.CounterVec activeExecutions prometheus.Gauge humanReviewsQueue prometheus.Gauge}
func setupMetrics() *MetricsCollector { return &MetricsCollector{ graphExecutions: promauto.NewCounterVec( prometheus.CounterOpts{ Name: "petalflow_graph_executions_total", Help: "Total number of graph executions", }, []string{"graph_name", "status"}, ), graphDuration: promauto.NewHistogramVec( prometheus.HistogramOpts{ Name: "petalflow_graph_duration_seconds", Help: "Graph execution duration in seconds", Buckets: []float64{.1, .25, .5, 1, 2.5, 5, 10, 30, 60}, }, []string{"graph_name"}, ), nodeExecutions: promauto.NewCounterVec( prometheus.CounterOpts{ Name: "petalflow_node_executions_total", Help: "Total number of node executions", }, []string{"graph_name", "node_id", "node_type"}, ), nodeDuration: promauto.NewHistogramVec( prometheus.HistogramOpts{ Name: "petalflow_node_duration_seconds", Help: "Node execution duration in seconds", Buckets: []float64{.01, .05, .1, .25, .5, 1, 2.5, 5, 10}, }, []string{"graph_name", "node_id"}, ), nodeErrors: promauto.NewCounterVec( prometheus.CounterOpts{ Name: "petalflow_node_errors_total", Help: "Total number of node errors", }, []string{"graph_name", "node_id", "error_type"}, ), routeDecisions: promauto.NewCounterVec( prometheus.CounterOpts{ Name: "petalflow_route_decisions_total", Help: "Total routing decisions by target", }, []string{"graph_name", "router_id", "target"}, ), cacheHits: promauto.NewCounterVec( prometheus.CounterOpts{ Name: "petalflow_cache_hits_total", Help: "Total cache hits", }, []string{"graph_name", "cache_id"}, ), cacheMisses: promauto.NewCounterVec( prometheus.CounterOpts{ Name: "petalflow_cache_misses_total", Help: "Total cache misses", }, []string{"graph_name", "cache_id"}, ), activeExecutions: promauto.NewGauge( prometheus.GaugeOpts{ Name: "petalflow_active_executions", Help: "Number of currently active graph executions", }, ), humanReviewsQueue: promauto.NewGauge( prometheus.GaugeOpts{ Name: "petalflow_human_reviews_pending", Help: "Number of items pending human review", }, ), }}
// MetricsEventHandler records Prometheus metricstype MetricsEventHandler struct { collector *MetricsCollector graphName string startTime time.Time}
func NewMetricsEventHandler(collector *MetricsCollector, graphName string) *MetricsEventHandler { return &MetricsEventHandler{ collector: collector, graphName: graphName, }}
func (h *MetricsEventHandler) Handle(event petalflow.Event) { labels := prometheus.Labels{ "graph_name": h.graphName, }
switch event.Kind { case petalflow.EventGraphStart: h.startTime = event.Timestamp h.collector.activeExecutions.Inc()
case petalflow.EventGraphEnd: h.collector.activeExecutions.Dec() duration := event.Timestamp.Sub(h.startTime).Seconds() h.collector.graphDuration.With(labels).Observe(duration)
status := "success" if event.Error != nil { status = "error" } labels["status"] = status h.collector.graphExecutions.With(labels).Inc()
case petalflow.EventNodeStart: labels["node_id"] = event.NodeID labels["node_type"] = event.Data["node_type"].(string) h.collector.nodeExecutions.With(labels).Inc()
case petalflow.EventNodeEnd: labels["node_id"] = event.NodeID h.collector.nodeDuration.With(labels).Observe(event.Duration.Seconds())
case petalflow.EventNodeError: labels["node_id"] = event.NodeID labels["error_type"] = categorizeError(event.Error) h.collector.nodeErrors.With(labels).Inc()
case petalflow.EventRouteDecision: labels["router_id"] = event.NodeID labels["target"] = event.Data["target"].(string) h.collector.routeDecisions.With(labels).Inc()
case petalflow.EventCacheHit: labels["cache_id"] = event.NodeID h.collector.cacheHits.With(labels).Inc()
case petalflow.EventCacheMiss: labels["cache_id"] = event.NodeID h.collector.cacheMisses.With(labels).Inc()
case petalflow.EventHumanPending: h.collector.humanReviewsQueue.Inc()
case petalflow.EventHumanResolved: h.collector.humanReviewsQueue.Dec() }}
func categorizeError(err error) string { errStr := err.Error() switch { case contains(errStr, "timeout"): return "timeout" case contains(errStr, "rate limit"): return "rate_limit" case contains(errStr, "validation"): return "validation" default: return "unknown" }}
func contains(s, substr string) bool { return len(s) >= len(substr) && (s == substr || len(s) > 0 && containsImpl(s, substr))}
func containsImpl(s, substr string) bool { for i := 0; i <= len(s)-len(substr); i++ { if s[i:i+len(substr)] == substr { return true } } return false}OpenTelemetry Tracing
Section titled “OpenTelemetry Tracing”func setupTracer() trace.Tracer { // In production, configure with your tracing backend (Jaeger, Zipkin, etc.) return otel.Tracer("petalflow")}
// TracingEventHandler creates OpenTelemetry spanstype TracingEventHandler struct { tracer trace.Tracer spans map[string]trace.Span rootSpan trace.Span rootCtx context.Context mu sync.Mutex}
func NewTracingEventHandler(tracer trace.Tracer, ctx context.Context, graphName string) *TracingEventHandler { rootCtx, rootSpan := tracer.Start(ctx, graphName, trace.WithAttributes( attribute.String("graph.name", graphName), ), )
return &TracingEventHandler{ tracer: tracer, spans: make(map[string]trace.Span), rootSpan: rootSpan, rootCtx: rootCtx, }}
func (h *TracingEventHandler) Handle(event petalflow.Event) { h.mu.Lock() defer h.mu.Unlock()
switch event.Kind { case petalflow.EventNodeStart: _, span := h.tracer.Start(h.rootCtx, event.NodeID, trace.WithAttributes( attribute.String("node.id", event.NodeID), ), ) h.spans[event.NodeID] = span
case petalflow.EventNodeEnd: if span, ok := h.spans[event.NodeID]; ok { span.SetAttributes( attribute.Int64("duration_ms", event.Duration.Milliseconds()), ) span.End() delete(h.spans, event.NodeID) }
case petalflow.EventNodeError: if span, ok := h.spans[event.NodeID]; ok { span.RecordError(event.Error) span.End() delete(h.spans, event.NodeID) }
case petalflow.EventRouteDecision: h.rootSpan.AddEvent("route_decision", trace.WithAttributes( attribute.String("router", event.NodeID), attribute.String("target", event.Data["target"].(string)), ), )
case petalflow.EventGraphEnd: h.rootSpan.End() }}
func (h *TracingEventHandler) TraceID() string { return h.rootSpan.SpanContext().TraceID().String()}Audit Logging
Section titled “Audit Logging”// AuditEvent represents an auditable actiontype AuditEvent struct { Timestamp time.Time `json:"timestamp"` TraceID string `json:"trace_id"` GraphName string `json:"graph_name"` EventType string `json:"event_type"` NodeID string `json:"node_id,omitempty"` UserID string `json:"user_id,omitempty"` Action string `json:"action"` Details map[string]any `json:"details,omitempty"` InputHash string `json:"input_hash,omitempty"` OutputHash string `json:"output_hash,omitempty"`}
// AuditLogger writes audit events for compliancetype AuditLogger struct { writer *json.Encoder graphName string traceID string mu sync.Mutex}
func NewAuditLogger(graphName, traceID string) *AuditLogger { // In production, write to secure audit log storage return &AuditLogger{ writer: json.NewEncoder(os.Stdout), graphName: graphName, traceID: traceID, }}
func (a *AuditLogger) Handle(event petalflow.Event) { a.mu.Lock() defer a.mu.Unlock()
// Only audit significant events var auditEvent *AuditEvent
switch event.Kind { case petalflow.EventGraphStart: auditEvent = &AuditEvent{ EventType: "graph_start", Action: "execution_started", Details: event.Data, }
case petalflow.EventGraphEnd: status := "success" if event.Error != nil { status = "failure" } auditEvent = &AuditEvent{ EventType: "graph_end", Action: "execution_completed", Details: map[string]any{ "status": status, "duration": event.Duration.String(), }, }
case petalflow.EventNodeError: auditEvent = &AuditEvent{ EventType: "node_error", NodeID: event.NodeID, Action: "error_occurred", Details: map[string]any{ "error": event.Error.Error(), }, }
case petalflow.EventHumanResolved: auditEvent = &AuditEvent{ EventType: "human_review", NodeID: event.NodeID, Action: "review_completed", UserID: event.Data["reviewer_id"].(string), Details: map[string]any{ "decision": event.Data["approved"], "notes": event.Data["notes"], }, }
case petalflow.EventRouteDecision: auditEvent = &AuditEvent{ EventType: "route_decision", NodeID: event.NodeID, Action: "routing_decision", Details: map[string]any{ "target": event.Data["target"], "reason": event.Data["reason"], }, } }
if auditEvent != nil { auditEvent.Timestamp = event.Timestamp auditEvent.TraceID = a.traceID auditEvent.GraphName = a.graphName a.writer.Encode(auditEvent) }}Composite Observer
Section titled “Composite Observer”// CompositeObserver combines multiple event handlerstype CompositeObserver struct { handlers []func(petalflow.Event)}
func NewCompositeObserver(logger *slog.Logger, metrics *MetricsCollector, tracer trace.Tracer) *CompositeObserver { return &CompositeObserver{}}
func (o *CompositeObserver) CreateHandler(ctx context.Context, graphName string) petalflow.EventHandler { // Create individual handlers loggingHandler := NewLoggingEventHandler(slog.Default()) metricsHandler := NewMetricsEventHandler(setupMetrics(), graphName) tracingHandler := NewTracingEventHandler(setupTracer(), ctx, graphName) auditHandler := NewAuditLogger(graphName, tracingHandler.TraceID())
// Return composite handler return func(event petalflow.Event) { // Add trace ID to event data event.Data["trace_id"] = tracingHandler.TraceID()
// Call all handlers loggingHandler.Handle(event) metricsHandler.Handle(event) tracingHandler.Handle(event) auditHandler.Handle(event) }}Sample Graph
Section titled “Sample Graph”func buildSampleGraph(client *irisadapter.ProviderAdapter) petalflow.Graph { g := petalflow.NewGraph("observable-workflow")
// Validation g.AddNode(petalflow.NewGuardianNode("validate", petalflow.GuardianNodeConfig{ Checks: []petalflow.GuardCheck{ {Var: "input", Op: petalflow.OpNotEmpty, Message: "Input required"}, }, }))
// Processing with cache g.AddNode(petalflow.NewCacheNode("cache", petalflow.CacheNodeConfig{ Store: petalflow.NewMemoryCacheStore(), CacheKey: "process:{{.Vars.input_hash}}", TTL: 10 * time.Minute, }))
// LLM processing g.AddNode(petalflow.NewLLMNode("process", client, petalflow.LLMNodeConfig{ Model: "gpt-4o-mini", PromptTemplate: "Process: {{.Vars.input}}", OutputKey: "result", }))
// Router g.AddNode(petalflow.NewRuleRouter("route", petalflow.RuleRouterConfig{ Routes: []petalflow.RouteRule{ {When: petalflow.RouteCondition{Var: "result.score", Op: petalflow.OpGt, Value: 0.8}, To: "high_confidence"}, }, Default: "low_confidence", }))
// Outputs g.AddNode(petalflow.NewTransformNode("high_confidence", petalflow.TransformNodeConfig{ Transform: func(inputs map[string]any) (any, error) { return inputs, nil }, OutputKey: "output", })) g.AddNode(petalflow.NewTransformNode("low_confidence", petalflow.TransformNodeConfig{ Transform: func(inputs map[string]any) (any, error) { return inputs, nil }, OutputKey: "output", }))
// Edges g.AddEdge("validate", "cache") g.AddEdge("cache", "process") g.AddEdge("process", "route") g.AddEdge("route", "high_confidence") g.AddEdge("route", "low_confidence") g.SetEntry("validate")
return g}Running with Observability
Section titled “Running with Observability”func runObservedWorkflow(graph petalflow.Graph, observer *CompositeObserver) { runtime := petalflow.NewRuntime() ctx := context.Background()
env := petalflow.NewEnvelope() env.SetVar("input", "Sample input for processing") env.SetVar("input_hash", hashInput("Sample input for processing"))
// Create composite handler handler := observer.CreateHandler(ctx, "observable-workflow")
result, err := runtime.Run(ctx, graph, env, petalflow.RunOptions{ EventHandler: handler, }) if err != nil { slog.Error("workflow failed", "error", err) return }
output := result.GetVar("output") slog.Info("workflow completed", "output", output)}
func hashInput(input string) string { // Simple hash for demo return fmt.Sprintf("%x", input[:8])}Example Log Output
Section titled “Example Log Output”{"time":"2024-01-15T10:30:00.000Z","level":"INFO","msg":"graph execution started","event_kind":"graph_start","node_id":"","trace_id":"abc123"}{"time":"2024-01-15T10:30:00.001Z","level":"DEBUG","msg":"node started","event_kind":"node_start","node_id":"validate","trace_id":"abc123"}{"time":"2024-01-15T10:30:00.002Z","level":"INFO","msg":"node completed","event_kind":"node_end","node_id":"validate","duration":"1ms","trace_id":"abc123"}{"time":"2024-01-15T10:30:00.003Z","level":"DEBUG","msg":"cache miss","event_kind":"cache_miss","node_id":"cache","trace_id":"abc123"}{"time":"2024-01-15T10:30:00.004Z","level":"DEBUG","msg":"node started","event_kind":"node_start","node_id":"process","trace_id":"abc123"}{"time":"2024-01-15T10:30:01.204Z","level":"INFO","msg":"node completed","event_kind":"node_end","node_id":"process","duration":"1.2s","trace_id":"abc123"}{"time":"2024-01-15T10:30:01.205Z","level":"INFO","msg":"route decision made","event_kind":"route_decision","node_id":"route","target":"high_confidence","trace_id":"abc123"}{"time":"2024-01-15T10:30:01.207Z","level":"INFO","msg":"graph execution completed","event_kind":"graph_end","duration":"1.207s","trace_id":"abc123"}Grafana Dashboard Queries
Section titled “Grafana Dashboard Queries”Execution Latency (PromQL)
Section titled “Execution Latency (PromQL)”# P95 latency by graphhistogram_quantile(0.95, rate(petalflow_graph_duration_seconds_bucket[5m]))
# Average node latencyrate(petalflow_node_duration_seconds_sum[5m]) / rate(petalflow_node_duration_seconds_count[5m])Error Rate
Section titled “Error Rate”# Error rate by graphsum(rate(petalflow_node_errors_total[5m])) by (graph_name) / sum(rate(petalflow_node_executions_total[5m])) by (graph_name)Cache Efficiency
Section titled “Cache Efficiency”# Cache hit ratesum(rate(petalflow_cache_hits_total[5m])) / (sum(rate(petalflow_cache_hits_total[5m])) + sum(rate(petalflow_cache_misses_total[5m])))Routing Distribution
Section titled “Routing Distribution”# Route decisions by targetsum(rate(petalflow_route_decisions_total[5m])) by (target)