Skip to content

Observability

This example demonstrates comprehensive observability for PetalFlow workflows including structured logging, metrics collection, distributed tracing, and audit logging.

A fully instrumented workflow with:

  • Structured logging with correlation IDs
  • Prometheus metrics for latency, throughput, and errors
  • OpenTelemetry tracing for distributed trace context
  • Audit logging for compliance and debugging
  • Real-time dashboards via event streaming
package main
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"os"
"sync"
"time"
"github.com/petal-labs/iris/providers/openai"
"github.com/petal-labs/petalflow"
"github.com/petal-labs/petalflow/irisadapter"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
func main() {
// Initialize observability
logger := setupLogger()
metrics := setupMetrics()
tracer := setupTracer()
provider := openai.New(os.Getenv("OPENAI_API_KEY"))
client := irisadapter.NewProviderAdapter(provider)
graph := buildSampleGraph(client)
// Create composite observer
observer := NewCompositeObserver(logger, metrics, tracer)
runObservedWorkflow(graph, observer)
}
func setupLogger() *slog.Logger {
opts := &slog.HandlerOptions{
Level: slog.LevelDebug,
ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
// Customize timestamp format
if a.Key == slog.TimeKey {
a.Value = slog.StringValue(a.Value.Time().Format(time.RFC3339Nano))
}
return a
},
}
handler := slog.NewJSONHandler(os.Stdout, opts)
return slog.New(handler)
}
// LoggingEventHandler creates a structured logging handler
type LoggingEventHandler struct {
logger *slog.Logger
}
func NewLoggingEventHandler(logger *slog.Logger) *LoggingEventHandler {
return &LoggingEventHandler{logger: logger}
}
func (h *LoggingEventHandler) Handle(event petalflow.Event) {
attrs := []slog.Attr{
slog.String("event_kind", string(event.Kind)),
slog.String("node_id", event.NodeID),
slog.Time("timestamp", event.Timestamp),
}
// Add event-specific attributes
if event.Duration > 0 {
attrs = append(attrs, slog.Duration("duration", event.Duration))
}
for k, v := range event.Data {
attrs = append(attrs, slog.Any(k, v))
}
// Get trace context if available
if traceID, ok := event.Data["trace_id"].(string); ok {
attrs = append(attrs, slog.String("trace_id", traceID))
}
switch event.Kind {
case petalflow.EventGraphStart:
h.logger.LogAttrs(context.Background(), slog.LevelInfo, "graph execution started", attrs...)
case petalflow.EventGraphEnd:
h.logger.LogAttrs(context.Background(), slog.LevelInfo, "graph execution completed", attrs...)
case petalflow.EventNodeStart:
h.logger.LogAttrs(context.Background(), slog.LevelDebug, "node started", attrs...)
case petalflow.EventNodeEnd:
h.logger.LogAttrs(context.Background(), slog.LevelInfo, "node completed", attrs...)
case petalflow.EventNodeError:
attrs = append(attrs, slog.String("error", event.Error.Error()))
h.logger.LogAttrs(context.Background(), slog.LevelError, "node failed", attrs...)
case petalflow.EventRouteDecision:
h.logger.LogAttrs(context.Background(), slog.LevelInfo, "route decision made", attrs...)
case petalflow.EventCacheHit:
h.logger.LogAttrs(context.Background(), slog.LevelDebug, "cache hit", attrs...)
case petalflow.EventCacheMiss:
h.logger.LogAttrs(context.Background(), slog.LevelDebug, "cache miss", attrs...)
default:
h.logger.LogAttrs(context.Background(), slog.LevelDebug, "workflow event", attrs...)
}
}
// MetricsCollector holds Prometheus metrics
type MetricsCollector struct {
graphExecutions *prometheus.CounterVec
graphDuration *prometheus.HistogramVec
nodeExecutions *prometheus.CounterVec
nodeDuration *prometheus.HistogramVec
nodeErrors *prometheus.CounterVec
routeDecisions *prometheus.CounterVec
cacheHits *prometheus.CounterVec
cacheMisses *prometheus.CounterVec
activeExecutions prometheus.Gauge
humanReviewsQueue prometheus.Gauge
}
func setupMetrics() *MetricsCollector {
return &MetricsCollector{
graphExecutions: promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "petalflow_graph_executions_total",
Help: "Total number of graph executions",
},
[]string{"graph_name", "status"},
),
graphDuration: promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "petalflow_graph_duration_seconds",
Help: "Graph execution duration in seconds",
Buckets: []float64{.1, .25, .5, 1, 2.5, 5, 10, 30, 60},
},
[]string{"graph_name"},
),
nodeExecutions: promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "petalflow_node_executions_total",
Help: "Total number of node executions",
},
[]string{"graph_name", "node_id", "node_type"},
),
nodeDuration: promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "petalflow_node_duration_seconds",
Help: "Node execution duration in seconds",
Buckets: []float64{.01, .05, .1, .25, .5, 1, 2.5, 5, 10},
},
[]string{"graph_name", "node_id"},
),
nodeErrors: promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "petalflow_node_errors_total",
Help: "Total number of node errors",
},
[]string{"graph_name", "node_id", "error_type"},
),
routeDecisions: promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "petalflow_route_decisions_total",
Help: "Total routing decisions by target",
},
[]string{"graph_name", "router_id", "target"},
),
cacheHits: promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "petalflow_cache_hits_total",
Help: "Total cache hits",
},
[]string{"graph_name", "cache_id"},
),
cacheMisses: promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "petalflow_cache_misses_total",
Help: "Total cache misses",
},
[]string{"graph_name", "cache_id"},
),
activeExecutions: promauto.NewGauge(
prometheus.GaugeOpts{
Name: "petalflow_active_executions",
Help: "Number of currently active graph executions",
},
),
humanReviewsQueue: promauto.NewGauge(
prometheus.GaugeOpts{
Name: "petalflow_human_reviews_pending",
Help: "Number of items pending human review",
},
),
}
}
// MetricsEventHandler records Prometheus metrics
type MetricsEventHandler struct {
collector *MetricsCollector
graphName string
startTime time.Time
}
func NewMetricsEventHandler(collector *MetricsCollector, graphName string) *MetricsEventHandler {
return &MetricsEventHandler{
collector: collector,
graphName: graphName,
}
}
func (h *MetricsEventHandler) Handle(event petalflow.Event) {
labels := prometheus.Labels{
"graph_name": h.graphName,
}
switch event.Kind {
case petalflow.EventGraphStart:
h.startTime = event.Timestamp
h.collector.activeExecutions.Inc()
case petalflow.EventGraphEnd:
h.collector.activeExecutions.Dec()
duration := event.Timestamp.Sub(h.startTime).Seconds()
h.collector.graphDuration.With(labels).Observe(duration)
status := "success"
if event.Error != nil {
status = "error"
}
labels["status"] = status
h.collector.graphExecutions.With(labels).Inc()
case petalflow.EventNodeStart:
labels["node_id"] = event.NodeID
labels["node_type"] = event.Data["node_type"].(string)
h.collector.nodeExecutions.With(labels).Inc()
case petalflow.EventNodeEnd:
labels["node_id"] = event.NodeID
h.collector.nodeDuration.With(labels).Observe(event.Duration.Seconds())
case petalflow.EventNodeError:
labels["node_id"] = event.NodeID
labels["error_type"] = categorizeError(event.Error)
h.collector.nodeErrors.With(labels).Inc()
case petalflow.EventRouteDecision:
labels["router_id"] = event.NodeID
labels["target"] = event.Data["target"].(string)
h.collector.routeDecisions.With(labels).Inc()
case petalflow.EventCacheHit:
labels["cache_id"] = event.NodeID
h.collector.cacheHits.With(labels).Inc()
case petalflow.EventCacheMiss:
labels["cache_id"] = event.NodeID
h.collector.cacheMisses.With(labels).Inc()
case petalflow.EventHumanPending:
h.collector.humanReviewsQueue.Inc()
case petalflow.EventHumanResolved:
h.collector.humanReviewsQueue.Dec()
}
}
func categorizeError(err error) string {
errStr := err.Error()
switch {
case contains(errStr, "timeout"):
return "timeout"
case contains(errStr, "rate limit"):
return "rate_limit"
case contains(errStr, "validation"):
return "validation"
default:
return "unknown"
}
}
func contains(s, substr string) bool {
return len(s) >= len(substr) && (s == substr || len(s) > 0 && containsImpl(s, substr))
}
func containsImpl(s, substr string) bool {
for i := 0; i <= len(s)-len(substr); i++ {
if s[i:i+len(substr)] == substr {
return true
}
}
return false
}
func setupTracer() trace.Tracer {
// In production, configure with your tracing backend (Jaeger, Zipkin, etc.)
return otel.Tracer("petalflow")
}
// TracingEventHandler creates OpenTelemetry spans
type TracingEventHandler struct {
tracer trace.Tracer
spans map[string]trace.Span
rootSpan trace.Span
rootCtx context.Context
mu sync.Mutex
}
func NewTracingEventHandler(tracer trace.Tracer, ctx context.Context, graphName string) *TracingEventHandler {
rootCtx, rootSpan := tracer.Start(ctx, graphName,
trace.WithAttributes(
attribute.String("graph.name", graphName),
),
)
return &TracingEventHandler{
tracer: tracer,
spans: make(map[string]trace.Span),
rootSpan: rootSpan,
rootCtx: rootCtx,
}
}
func (h *TracingEventHandler) Handle(event petalflow.Event) {
h.mu.Lock()
defer h.mu.Unlock()
switch event.Kind {
case petalflow.EventNodeStart:
_, span := h.tracer.Start(h.rootCtx, event.NodeID,
trace.WithAttributes(
attribute.String("node.id", event.NodeID),
),
)
h.spans[event.NodeID] = span
case petalflow.EventNodeEnd:
if span, ok := h.spans[event.NodeID]; ok {
span.SetAttributes(
attribute.Int64("duration_ms", event.Duration.Milliseconds()),
)
span.End()
delete(h.spans, event.NodeID)
}
case petalflow.EventNodeError:
if span, ok := h.spans[event.NodeID]; ok {
span.RecordError(event.Error)
span.End()
delete(h.spans, event.NodeID)
}
case petalflow.EventRouteDecision:
h.rootSpan.AddEvent("route_decision",
trace.WithAttributes(
attribute.String("router", event.NodeID),
attribute.String("target", event.Data["target"].(string)),
),
)
case petalflow.EventGraphEnd:
h.rootSpan.End()
}
}
func (h *TracingEventHandler) TraceID() string {
return h.rootSpan.SpanContext().TraceID().String()
}
// AuditEvent represents an auditable action
type AuditEvent struct {
Timestamp time.Time `json:"timestamp"`
TraceID string `json:"trace_id"`
GraphName string `json:"graph_name"`
EventType string `json:"event_type"`
NodeID string `json:"node_id,omitempty"`
UserID string `json:"user_id,omitempty"`
Action string `json:"action"`
Details map[string]any `json:"details,omitempty"`
InputHash string `json:"input_hash,omitempty"`
OutputHash string `json:"output_hash,omitempty"`
}
// AuditLogger writes audit events for compliance
type AuditLogger struct {
writer *json.Encoder
graphName string
traceID string
mu sync.Mutex
}
func NewAuditLogger(graphName, traceID string) *AuditLogger {
// In production, write to secure audit log storage
return &AuditLogger{
writer: json.NewEncoder(os.Stdout),
graphName: graphName,
traceID: traceID,
}
}
func (a *AuditLogger) Handle(event petalflow.Event) {
a.mu.Lock()
defer a.mu.Unlock()
// Only audit significant events
var auditEvent *AuditEvent
switch event.Kind {
case petalflow.EventGraphStart:
auditEvent = &AuditEvent{
EventType: "graph_start",
Action: "execution_started",
Details: event.Data,
}
case petalflow.EventGraphEnd:
status := "success"
if event.Error != nil {
status = "failure"
}
auditEvent = &AuditEvent{
EventType: "graph_end",
Action: "execution_completed",
Details: map[string]any{
"status": status,
"duration": event.Duration.String(),
},
}
case petalflow.EventNodeError:
auditEvent = &AuditEvent{
EventType: "node_error",
NodeID: event.NodeID,
Action: "error_occurred",
Details: map[string]any{
"error": event.Error.Error(),
},
}
case petalflow.EventHumanResolved:
auditEvent = &AuditEvent{
EventType: "human_review",
NodeID: event.NodeID,
Action: "review_completed",
UserID: event.Data["reviewer_id"].(string),
Details: map[string]any{
"decision": event.Data["approved"],
"notes": event.Data["notes"],
},
}
case petalflow.EventRouteDecision:
auditEvent = &AuditEvent{
EventType: "route_decision",
NodeID: event.NodeID,
Action: "routing_decision",
Details: map[string]any{
"target": event.Data["target"],
"reason": event.Data["reason"],
},
}
}
if auditEvent != nil {
auditEvent.Timestamp = event.Timestamp
auditEvent.TraceID = a.traceID
auditEvent.GraphName = a.graphName
a.writer.Encode(auditEvent)
}
}
// CompositeObserver combines multiple event handlers
type CompositeObserver struct {
handlers []func(petalflow.Event)
}
func NewCompositeObserver(logger *slog.Logger, metrics *MetricsCollector, tracer trace.Tracer) *CompositeObserver {
return &CompositeObserver{}
}
func (o *CompositeObserver) CreateHandler(ctx context.Context, graphName string) petalflow.EventHandler {
// Create individual handlers
loggingHandler := NewLoggingEventHandler(slog.Default())
metricsHandler := NewMetricsEventHandler(setupMetrics(), graphName)
tracingHandler := NewTracingEventHandler(setupTracer(), ctx, graphName)
auditHandler := NewAuditLogger(graphName, tracingHandler.TraceID())
// Return composite handler
return func(event petalflow.Event) {
// Add trace ID to event data
event.Data["trace_id"] = tracingHandler.TraceID()
// Call all handlers
loggingHandler.Handle(event)
metricsHandler.Handle(event)
tracingHandler.Handle(event)
auditHandler.Handle(event)
}
}
func buildSampleGraph(client *irisadapter.ProviderAdapter) petalflow.Graph {
g := petalflow.NewGraph("observable-workflow")
// Validation
g.AddNode(petalflow.NewGuardianNode("validate", petalflow.GuardianNodeConfig{
Checks: []petalflow.GuardCheck{
{Var: "input", Op: petalflow.OpNotEmpty, Message: "Input required"},
},
}))
// Processing with cache
g.AddNode(petalflow.NewCacheNode("cache", petalflow.CacheNodeConfig{
Store: petalflow.NewMemoryCacheStore(),
CacheKey: "process:{{.Vars.input_hash}}",
TTL: 10 * time.Minute,
}))
// LLM processing
g.AddNode(petalflow.NewLLMNode("process", client, petalflow.LLMNodeConfig{
Model: "gpt-4o-mini",
PromptTemplate: "Process: {{.Vars.input}}",
OutputKey: "result",
}))
// Router
g.AddNode(petalflow.NewRuleRouter("route", petalflow.RuleRouterConfig{
Routes: []petalflow.RouteRule{
{When: petalflow.RouteCondition{Var: "result.score", Op: petalflow.OpGt, Value: 0.8}, To: "high_confidence"},
},
Default: "low_confidence",
}))
// Outputs
g.AddNode(petalflow.NewTransformNode("high_confidence", petalflow.TransformNodeConfig{
Transform: func(inputs map[string]any) (any, error) { return inputs, nil },
OutputKey: "output",
}))
g.AddNode(petalflow.NewTransformNode("low_confidence", petalflow.TransformNodeConfig{
Transform: func(inputs map[string]any) (any, error) { return inputs, nil },
OutputKey: "output",
}))
// Edges
g.AddEdge("validate", "cache")
g.AddEdge("cache", "process")
g.AddEdge("process", "route")
g.AddEdge("route", "high_confidence")
g.AddEdge("route", "low_confidence")
g.SetEntry("validate")
return g
}
func runObservedWorkflow(graph petalflow.Graph, observer *CompositeObserver) {
runtime := petalflow.NewRuntime()
ctx := context.Background()
env := petalflow.NewEnvelope()
env.SetVar("input", "Sample input for processing")
env.SetVar("input_hash", hashInput("Sample input for processing"))
// Create composite handler
handler := observer.CreateHandler(ctx, "observable-workflow")
result, err := runtime.Run(ctx, graph, env, petalflow.RunOptions{
EventHandler: handler,
})
if err != nil {
slog.Error("workflow failed", "error", err)
return
}
output := result.GetVar("output")
slog.Info("workflow completed", "output", output)
}
func hashInput(input string) string {
// Simple hash for demo
return fmt.Sprintf("%x", input[:8])
}
{"time":"2024-01-15T10:30:00.000Z","level":"INFO","msg":"graph execution started","event_kind":"graph_start","node_id":"","trace_id":"abc123"}
{"time":"2024-01-15T10:30:00.001Z","level":"DEBUG","msg":"node started","event_kind":"node_start","node_id":"validate","trace_id":"abc123"}
{"time":"2024-01-15T10:30:00.002Z","level":"INFO","msg":"node completed","event_kind":"node_end","node_id":"validate","duration":"1ms","trace_id":"abc123"}
{"time":"2024-01-15T10:30:00.003Z","level":"DEBUG","msg":"cache miss","event_kind":"cache_miss","node_id":"cache","trace_id":"abc123"}
{"time":"2024-01-15T10:30:00.004Z","level":"DEBUG","msg":"node started","event_kind":"node_start","node_id":"process","trace_id":"abc123"}
{"time":"2024-01-15T10:30:01.204Z","level":"INFO","msg":"node completed","event_kind":"node_end","node_id":"process","duration":"1.2s","trace_id":"abc123"}
{"time":"2024-01-15T10:30:01.205Z","level":"INFO","msg":"route decision made","event_kind":"route_decision","node_id":"route","target":"high_confidence","trace_id":"abc123"}
{"time":"2024-01-15T10:30:01.207Z","level":"INFO","msg":"graph execution completed","event_kind":"graph_end","duration":"1.207s","trace_id":"abc123"}
# P95 latency by graph
histogram_quantile(0.95, rate(petalflow_graph_duration_seconds_bucket[5m]))
# Average node latency
rate(petalflow_node_duration_seconds_sum[5m]) / rate(petalflow_node_duration_seconds_count[5m])
# Error rate by graph
sum(rate(petalflow_node_errors_total[5m])) by (graph_name)
/ sum(rate(petalflow_node_executions_total[5m])) by (graph_name)
# Cache hit rate
sum(rate(petalflow_cache_hits_total[5m]))
/ (sum(rate(petalflow_cache_hits_total[5m])) + sum(rate(petalflow_cache_misses_total[5m])))
# Route decisions by target
sum(rate(petalflow_route_decisions_total[5m])) by (target)