Debugging Runs
Debugging Runs
Section titled “Debugging Runs”PetalFlow provides rich debugging capabilities through event handlers, step controllers, and tracing. This guide covers techniques for development debugging, production monitoring, and troubleshooting common issues.
Runtime Events
Section titled “Runtime Events”The runtime emits events throughout graph execution. Subscribe to events with an EventHandler
to observe, log, or react to execution flow.
Event Types
Section titled “Event Types”| Event Kind | Description | When Emitted |
|---|---|---|
EventGraphStart | Graph execution begins | Before first node |
EventGraphEnd | Graph execution completes | After last node or error |
EventNodeStart | Node execution begins | Before node.Execute() |
EventNodeEnd | Node execution completes | After node.Execute() returns |
EventNodeError | Node execution failed | When node returns error |
EventRouteDecision | Router made routing choice | After router evaluates |
EventBranchStart | Parallel branch begins | When fan-out occurs |
EventBranchMerge | Parallel branches merged | When MergeNode completes |
EventCacheHit | Cache returned stored value | When CacheNode finds match |
EventCacheMiss | Cache had no stored value | When CacheNode has no match |
EventHumanPending | Awaiting human approval | When HumanNode pauses |
EventHumanResolved | Human approval received | When HumanNode resumes |
Event Structure
Section titled “Event Structure”type Event struct { Kind EventKind // Event type NodeID string // Node that triggered event Timestamp time.Time // When event occurred Duration time.Duration // Execution time (for End events) Data map[string]any // Event-specific data Error error // Error details (for Error events)}Basic Event Handler
Section titled “Basic Event Handler”handler := func(event petalflow.Event) { switch event.Kind { case petalflow.EventNodeStart: log.Printf("[START] %s at %v", event.NodeID, event.Timestamp)
case petalflow.EventNodeEnd: log.Printf("[END] %s (took %v)", event.NodeID, event.Duration)
case petalflow.EventNodeError: log.Printf("[ERROR] %s: %v", event.NodeID, event.Error)
case petalflow.EventRouteDecision: log.Printf("[ROUTE] %s -> %s", event.NodeID, event.Data["target"]) }}
runtime := petalflow.NewRuntime()result, err := runtime.Run(ctx, graph, env, petalflow.RunOptions{ EventHandler: handler,})Structured Logging Handler
Section titled “Structured Logging Handler”For production systems, emit structured logs that integrate with your logging infrastructure:
func structuredLogHandler(logger *slog.Logger) petalflow.EventHandler { return func(event petalflow.Event) { attrs := []slog.Attr{ slog.String("node_id", event.NodeID), slog.Time("timestamp", event.Timestamp), slog.String("event_kind", string(event.Kind)), }
if event.Duration > 0 { attrs = append(attrs, slog.Duration("duration", event.Duration)) }
for k, v := range event.Data { attrs = append(attrs, slog.Any(k, v)) }
switch event.Kind { case petalflow.EventNodeError: logger.LogAttrs(ctx, slog.LevelError, "node execution failed", append(attrs, slog.String("error", event.Error.Error()))...) case petalflow.EventNodeStart: logger.LogAttrs(ctx, slog.LevelDebug, "node started", attrs...) case petalflow.EventNodeEnd: logger.LogAttrs(ctx, slog.LevelInfo, "node completed", attrs...) default: logger.LogAttrs(ctx, slog.LevelDebug, "graph event", attrs...) } }}Collecting Event History
Section titled “Collecting Event History”Store events for post-execution analysis or audit trails:
type EventCollector struct { events []petalflow.Event mu sync.Mutex}
func (c *EventCollector) Handler() petalflow.EventHandler { return func(event petalflow.Event) { c.mu.Lock() defer c.mu.Unlock() c.events = append(c.events, event) }}
func (c *EventCollector) Events() []petalflow.Event { c.mu.Lock() defer c.mu.Unlock() return append([]petalflow.Event{}, c.events...)}
func (c *EventCollector) FindByNode(nodeID string) []petalflow.Event { c.mu.Lock() defer c.mu.Unlock()
var matches []petalflow.Event for _, e := range c.events { if e.NodeID == nodeID { matches = append(matches, e) } } return matches}
// Usagecollector := &EventCollector{}runtime.Run(ctx, graph, env, petalflow.RunOptions{EventHandler: collector.Handler()})
// Analyze after executionfor _, event := range collector.Events() { if event.Kind == petalflow.EventNodeEnd && event.Duration > time.Second { log.Printf("Slow node: %s took %v", event.NodeID, event.Duration) }}Step Controllers
Section titled “Step Controllers”Step controllers enable fine-grained control over graph execution. Use them for debugging, human-in-the-loop workflows, or testing specific paths.
Breakpoint Controller
Section titled “Breakpoint Controller”Pause execution at specific nodes:
// Pause at the "review" nodecontroller := petalflow.NewBreakpointStepController([]string{"review", "approval"})opts := petalflow.NewStepRunOptions(controller)
// Graph pauses when reaching "review" or "approval" nodesresult, err := runtime.Run(ctx, graph, env, opts)Interactive Step Controller
Section titled “Interactive Step Controller”Step through nodes one at a time for debugging:
type InteractiveController struct { stepChan chan struct{}}
func NewInteractiveController() *InteractiveController { return &InteractiveController{stepChan: make(chan struct{})}}
func (c *InteractiveController) BeforeNode(ctx context.Context, nodeID string, env *petalflow.Envelope) error { fmt.Printf("About to execute: %s\n", nodeID) fmt.Printf("Envelope vars: %+v\n", env.Vars()) fmt.Print("Press Enter to continue...")
<-c.stepChan // Wait for signal return nil}
func (c *InteractiveController) AfterNode(ctx context.Context, nodeID string, env *petalflow.Envelope) error { fmt.Printf("Completed: %s\n", nodeID) return nil}
func (c *InteractiveController) Step() { c.stepChan <- struct{}{}}
// Usagecontroller := NewInteractiveController()go func() { reader := bufio.NewReader(os.Stdin) for { reader.ReadString('\n') controller.Step() }}()
runtime.Run(ctx, graph, env, petalflow.NewStepRunOptions(controller))Conditional Breakpoint Controller
Section titled “Conditional Breakpoint Controller”Pause only when specific conditions are met:
type ConditionalController struct { condition func(nodeID string, env *petalflow.Envelope) bool paused chan struct{}}
func (c *ConditionalController) BeforeNode(ctx context.Context, nodeID string, env *petalflow.Envelope) error { if c.condition(nodeID, env) { log.Printf("Breakpoint hit at %s", nodeID) <-c.paused // Wait for resume } return nil}
// Pause when error count exceeds thresholdcontroller := &ConditionalController{ condition: func(nodeID string, env *petalflow.Envelope) bool { errorCount, _ := env.GetVar("error_count").(int) return errorCount > 5 }, paused: make(chan struct{}),}Tracing and Observability
Section titled “Tracing and Observability”OpenTelemetry Integration
Section titled “OpenTelemetry Integration”Export traces to your observability platform:
func otelEventHandler(tracer trace.Tracer) petalflow.EventHandler { spans := make(map[string]trace.Span) var mu sync.Mutex
return func(event petalflow.Event) { mu.Lock() defer mu.Unlock()
switch event.Kind { case petalflow.EventNodeStart: ctx, span := tracer.Start(context.Background(), event.NodeID, trace.WithAttributes( attribute.String("node.id", event.NodeID), )) spans[event.NodeID] = span _ = ctx // Use for nested spans if needed
case petalflow.EventNodeEnd: if span, ok := spans[event.NodeID]; ok { span.SetAttributes( attribute.Int64("duration_ms", event.Duration.Milliseconds()), ) span.End() delete(spans, event.NodeID) }
case petalflow.EventNodeError: if span, ok := spans[event.NodeID]; ok { span.RecordError(event.Error) span.SetStatus(codes.Error, event.Error.Error()) span.End() delete(spans, event.NodeID) } } }}Metrics Collection
Section titled “Metrics Collection”Track execution metrics for monitoring dashboards:
type MetricsHandler struct { nodeExecutions *prometheus.CounterVec nodeDurations *prometheus.HistogramVec nodeErrors *prometheus.CounterVec routeDecisions *prometheus.CounterVec}
func (m *MetricsHandler) Handler() petalflow.EventHandler { return func(event petalflow.Event) { labels := prometheus.Labels{"node_id": event.NodeID}
switch event.Kind { case petalflow.EventNodeStart: m.nodeExecutions.With(labels).Inc()
case petalflow.EventNodeEnd: m.nodeDurations.With(labels).Observe(event.Duration.Seconds())
case petalflow.EventNodeError: m.nodeErrors.With(labels).Inc()
case petalflow.EventRouteDecision: target := event.Data["target"].(string) m.routeDecisions.With(prometheus.Labels{ "node_id": event.NodeID, "target": target, }).Inc() } }}Troubleshooting Common Issues
Section titled “Troubleshooting Common Issues”Graph Never Completes
Section titled “Graph Never Completes”Symptoms: Execution hangs indefinitely without reaching the end.
Possible Causes:
-
Missing edges: A node has no outgoing edge to continue execution.
// Check: Every non-terminal node should have outgoing edgesfor _, node := range graph.Nodes() {edges := graph.OutgoingEdges(node.ID())if len(edges) == 0 && !isTerminalNode(node) {log.Printf("WARNING: %s has no outgoing edges", node.ID())}} -
Deadlocked MergeNode: Parallel branches don’t all reach the merge point.
// Debug: Add logging before mergehandler := func(event petalflow.Event) {if event.NodeID == "merge_node" && event.Kind == petalflow.EventNodeStart {log.Printf("Merge receiving from: %v", event.Data["received_from"])}} -
Infinite loop in cyclic graph: Exit condition never triggers.
// Add iteration trackingtransform := petalflow.NewTransformNode("track_iterations", petalflow.TransformNodeConfig{Transform: func(inputs map[string]any) (any, error) {iter := inputs["iteration"].(int) + 1if iter > 100 {return nil, fmt.Errorf("exceeded max iterations")}return iter, nil},})
Unexpected Routing Path
Section titled “Unexpected Routing Path”Symptoms: Execution takes wrong branch at router nodes.
Debugging Steps:
-
Add event handler to log route decisions
handler := func(event petalflow.Event) {if event.Kind == petalflow.EventRouteDecision {log.Printf("Router %s chose %s (reason: %v)",event.NodeID,event.Data["target"],event.Data["matched_rule"])}} -
Inspect envelope state before router
controller := &InspectController{nodeID: "my_router",inspect: func(env *petalflow.Envelope) {log.Printf("Router input: %+v", env.Vars())},} -
Check rule order (first match wins)
// Rules are evaluated in order - put specific rules before general onesRoutes: []petalflow.RouteRule{{When: condition1, To: "specific_handler"}, // Check first{When: condition2, To: "general_handler"}, // Check second},
LLM Node Returns Empty Response
Section titled “LLM Node Returns Empty Response”Symptoms: OutputKey is empty or nil after LLMNode execution.
Debugging:
// Add detailed LLM logginghandler := func(event petalflow.Event) { if event.NodeID == "my_llm_node" { switch event.Kind { case petalflow.EventNodeStart: log.Printf("LLM prompt: %s", event.Data["prompt"]) case petalflow.EventNodeEnd: log.Printf("LLM response: %s", event.Data["response"]) log.Printf("Token usage: %v", event.Data["usage"]) case petalflow.EventNodeError: log.Printf("LLM error: %v", event.Error) } }}Common Causes:
- Template variables not found in envelope
- Model returned empty content (check stop sequences)
- API rate limiting or timeout
Envelope Data Missing
Section titled “Envelope Data Missing”Symptoms: Expected envelope key is nil or missing.
Debugging:
// Trace envelope mutationshandler := func(event petalflow.Event) { if event.Kind == petalflow.EventNodeEnd { log.Printf("After %s, envelope keys: %v", event.NodeID, event.Data["envelope_keys"]) }}
// Check specific keyfunc traceKey(key string) petalflow.EventHandler { return func(event petalflow.Event) { if event.Kind == petalflow.EventNodeEnd { value := event.Data["envelope"].(map[string]any)[key] log.Printf("After %s: %s = %v", event.NodeID, key, value) } }}Production Debugging
Section titled “Production Debugging”Safe Production Inspection
Section titled “Safe Production Inspection”Use sampling to reduce overhead in production:
type SampledHandler struct { underlying petalflow.EventHandler sampleRate float64 errorAlways bool}
func (h *SampledHandler) Handler() petalflow.EventHandler { return func(event petalflow.Event) { // Always capture errors if h.errorAlways && event.Kind == petalflow.EventNodeError { h.underlying(event) return }
// Sample other events if rand.Float64() < h.sampleRate { h.underlying(event) } }}
// Capture 1% of events, but all errorssampled := &SampledHandler{ underlying: structuredLogHandler(logger), sampleRate: 0.01, errorAlways: true,}Request-Level Debugging
Section titled “Request-Level Debugging”Enable detailed logging for specific requests:
func debuggableRun(ctx context.Context, runtime *petalflow.Runtime, graph petalflow.Graph, env *petalflow.Envelope) (*petalflow.Envelope, error) { // Check for debug header/flag debugEnabled := ctx.Value("debug_enabled").(bool)
opts := petalflow.RunOptions{} if debugEnabled { collector := &EventCollector{} opts.EventHandler = collector.Handler()
defer func() { // Store events for later retrieval requestID := ctx.Value("request_id").(string) storeDebugEvents(requestID, collector.Events()) }() }
return runtime.Run(ctx, graph, env, opts)}Replay Failed Runs
Section titled “Replay Failed Runs”Store enough state to replay failures in development:
type ReplayableRun struct { GraphName string InitialEnv map[string]any Events []petalflow.Event FinalEnv map[string]any Error error Timestamp time.Time}
func captureForReplay(graphName string) petalflow.EventHandler { run := &ReplayableRun{ GraphName: graphName, Timestamp: time.Now(), }
return func(event petalflow.Event) { run.Events = append(run.Events, event)
if event.Kind == petalflow.EventGraphStart { run.InitialEnv = event.Data["envelope"].(map[string]any) }
if event.Kind == petalflow.EventGraphEnd { run.FinalEnv = event.Data["envelope"].(map[string]any) if event.Error != nil { run.Error = event.Error saveFailedRun(run) // Store for later analysis } } }}