Skip to content

Debugging Runs

PetalFlow provides rich debugging capabilities through event handlers, step controllers, and tracing. This guide covers techniques for development debugging, production monitoring, and troubleshooting common issues.

The runtime emits events throughout graph execution. Subscribe to events with an EventHandler to observe, log, or react to execution flow.

Event KindDescriptionWhen Emitted
EventGraphStartGraph execution beginsBefore first node
EventGraphEndGraph execution completesAfter last node or error
EventNodeStartNode execution beginsBefore node.Execute()
EventNodeEndNode execution completesAfter node.Execute() returns
EventNodeErrorNode execution failedWhen node returns error
EventRouteDecisionRouter made routing choiceAfter router evaluates
EventBranchStartParallel branch beginsWhen fan-out occurs
EventBranchMergeParallel branches mergedWhen MergeNode completes
EventCacheHitCache returned stored valueWhen CacheNode finds match
EventCacheMissCache had no stored valueWhen CacheNode has no match
EventHumanPendingAwaiting human approvalWhen HumanNode pauses
EventHumanResolvedHuman approval receivedWhen HumanNode resumes
type Event struct {
Kind EventKind // Event type
NodeID string // Node that triggered event
Timestamp time.Time // When event occurred
Duration time.Duration // Execution time (for End events)
Data map[string]any // Event-specific data
Error error // Error details (for Error events)
}
handler := func(event petalflow.Event) {
switch event.Kind {
case petalflow.EventNodeStart:
log.Printf("[START] %s at %v", event.NodeID, event.Timestamp)
case petalflow.EventNodeEnd:
log.Printf("[END] %s (took %v)", event.NodeID, event.Duration)
case petalflow.EventNodeError:
log.Printf("[ERROR] %s: %v", event.NodeID, event.Error)
case petalflow.EventRouteDecision:
log.Printf("[ROUTE] %s -> %s", event.NodeID, event.Data["target"])
}
}
runtime := petalflow.NewRuntime()
result, err := runtime.Run(ctx, graph, env, petalflow.RunOptions{
EventHandler: handler,
})

For production systems, emit structured logs that integrate with your logging infrastructure:

func structuredLogHandler(logger *slog.Logger) petalflow.EventHandler {
return func(event petalflow.Event) {
attrs := []slog.Attr{
slog.String("node_id", event.NodeID),
slog.Time("timestamp", event.Timestamp),
slog.String("event_kind", string(event.Kind)),
}
if event.Duration > 0 {
attrs = append(attrs, slog.Duration("duration", event.Duration))
}
for k, v := range event.Data {
attrs = append(attrs, slog.Any(k, v))
}
switch event.Kind {
case petalflow.EventNodeError:
logger.LogAttrs(ctx, slog.LevelError, "node execution failed",
append(attrs, slog.String("error", event.Error.Error()))...)
case petalflow.EventNodeStart:
logger.LogAttrs(ctx, slog.LevelDebug, "node started", attrs...)
case petalflow.EventNodeEnd:
logger.LogAttrs(ctx, slog.LevelInfo, "node completed", attrs...)
default:
logger.LogAttrs(ctx, slog.LevelDebug, "graph event", attrs...)
}
}
}

Store events for post-execution analysis or audit trails:

type EventCollector struct {
events []petalflow.Event
mu sync.Mutex
}
func (c *EventCollector) Handler() petalflow.EventHandler {
return func(event petalflow.Event) {
c.mu.Lock()
defer c.mu.Unlock()
c.events = append(c.events, event)
}
}
func (c *EventCollector) Events() []petalflow.Event {
c.mu.Lock()
defer c.mu.Unlock()
return append([]petalflow.Event{}, c.events...)
}
func (c *EventCollector) FindByNode(nodeID string) []petalflow.Event {
c.mu.Lock()
defer c.mu.Unlock()
var matches []petalflow.Event
for _, e := range c.events {
if e.NodeID == nodeID {
matches = append(matches, e)
}
}
return matches
}
// Usage
collector := &EventCollector{}
runtime.Run(ctx, graph, env, petalflow.RunOptions{EventHandler: collector.Handler()})
// Analyze after execution
for _, event := range collector.Events() {
if event.Kind == petalflow.EventNodeEnd && event.Duration > time.Second {
log.Printf("Slow node: %s took %v", event.NodeID, event.Duration)
}
}

Step controllers enable fine-grained control over graph execution. Use them for debugging, human-in-the-loop workflows, or testing specific paths.

Pause execution at specific nodes:

// Pause at the "review" node
controller := petalflow.NewBreakpointStepController([]string{"review", "approval"})
opts := petalflow.NewStepRunOptions(controller)
// Graph pauses when reaching "review" or "approval" nodes
result, err := runtime.Run(ctx, graph, env, opts)

Step through nodes one at a time for debugging:

type InteractiveController struct {
stepChan chan struct{}
}
func NewInteractiveController() *InteractiveController {
return &InteractiveController{stepChan: make(chan struct{})}
}
func (c *InteractiveController) BeforeNode(ctx context.Context, nodeID string, env *petalflow.Envelope) error {
fmt.Printf("About to execute: %s\n", nodeID)
fmt.Printf("Envelope vars: %+v\n", env.Vars())
fmt.Print("Press Enter to continue...")
<-c.stepChan // Wait for signal
return nil
}
func (c *InteractiveController) AfterNode(ctx context.Context, nodeID string, env *petalflow.Envelope) error {
fmt.Printf("Completed: %s\n", nodeID)
return nil
}
func (c *InteractiveController) Step() {
c.stepChan <- struct{}{}
}
// Usage
controller := NewInteractiveController()
go func() {
reader := bufio.NewReader(os.Stdin)
for {
reader.ReadString('\n')
controller.Step()
}
}()
runtime.Run(ctx, graph, env, petalflow.NewStepRunOptions(controller))

Pause only when specific conditions are met:

type ConditionalController struct {
condition func(nodeID string, env *petalflow.Envelope) bool
paused chan struct{}
}
func (c *ConditionalController) BeforeNode(ctx context.Context, nodeID string, env *petalflow.Envelope) error {
if c.condition(nodeID, env) {
log.Printf("Breakpoint hit at %s", nodeID)
<-c.paused // Wait for resume
}
return nil
}
// Pause when error count exceeds threshold
controller := &ConditionalController{
condition: func(nodeID string, env *petalflow.Envelope) bool {
errorCount, _ := env.GetVar("error_count").(int)
return errorCount > 5
},
paused: make(chan struct{}),
}

Export traces to your observability platform:

func otelEventHandler(tracer trace.Tracer) petalflow.EventHandler {
spans := make(map[string]trace.Span)
var mu sync.Mutex
return func(event petalflow.Event) {
mu.Lock()
defer mu.Unlock()
switch event.Kind {
case petalflow.EventNodeStart:
ctx, span := tracer.Start(context.Background(), event.NodeID,
trace.WithAttributes(
attribute.String("node.id", event.NodeID),
))
spans[event.NodeID] = span
_ = ctx // Use for nested spans if needed
case petalflow.EventNodeEnd:
if span, ok := spans[event.NodeID]; ok {
span.SetAttributes(
attribute.Int64("duration_ms", event.Duration.Milliseconds()),
)
span.End()
delete(spans, event.NodeID)
}
case petalflow.EventNodeError:
if span, ok := spans[event.NodeID]; ok {
span.RecordError(event.Error)
span.SetStatus(codes.Error, event.Error.Error())
span.End()
delete(spans, event.NodeID)
}
}
}
}

Track execution metrics for monitoring dashboards:

type MetricsHandler struct {
nodeExecutions *prometheus.CounterVec
nodeDurations *prometheus.HistogramVec
nodeErrors *prometheus.CounterVec
routeDecisions *prometheus.CounterVec
}
func (m *MetricsHandler) Handler() petalflow.EventHandler {
return func(event petalflow.Event) {
labels := prometheus.Labels{"node_id": event.NodeID}
switch event.Kind {
case petalflow.EventNodeStart:
m.nodeExecutions.With(labels).Inc()
case petalflow.EventNodeEnd:
m.nodeDurations.With(labels).Observe(event.Duration.Seconds())
case petalflow.EventNodeError:
m.nodeErrors.With(labels).Inc()
case petalflow.EventRouteDecision:
target := event.Data["target"].(string)
m.routeDecisions.With(prometheus.Labels{
"node_id": event.NodeID,
"target": target,
}).Inc()
}
}
}

Symptoms: Execution hangs indefinitely without reaching the end.

Possible Causes:

  1. Missing edges: A node has no outgoing edge to continue execution.

    // Check: Every non-terminal node should have outgoing edges
    for _, node := range graph.Nodes() {
    edges := graph.OutgoingEdges(node.ID())
    if len(edges) == 0 && !isTerminalNode(node) {
    log.Printf("WARNING: %s has no outgoing edges", node.ID())
    }
    }
  2. Deadlocked MergeNode: Parallel branches don’t all reach the merge point.

    // Debug: Add logging before merge
    handler := func(event petalflow.Event) {
    if event.NodeID == "merge_node" && event.Kind == petalflow.EventNodeStart {
    log.Printf("Merge receiving from: %v", event.Data["received_from"])
    }
    }
  3. Infinite loop in cyclic graph: Exit condition never triggers.

    // Add iteration tracking
    transform := petalflow.NewTransformNode("track_iterations", petalflow.TransformNodeConfig{
    Transform: func(inputs map[string]any) (any, error) {
    iter := inputs["iteration"].(int) + 1
    if iter > 100 {
    return nil, fmt.Errorf("exceeded max iterations")
    }
    return iter, nil
    },
    })

Symptoms: Execution takes wrong branch at router nodes.

Debugging Steps:

  1. Add event handler to log route decisions

    handler := func(event petalflow.Event) {
    if event.Kind == petalflow.EventRouteDecision {
    log.Printf("Router %s chose %s (reason: %v)",
    event.NodeID,
    event.Data["target"],
    event.Data["matched_rule"])
    }
    }
  2. Inspect envelope state before router

    controller := &InspectController{
    nodeID: "my_router",
    inspect: func(env *petalflow.Envelope) {
    log.Printf("Router input: %+v", env.Vars())
    },
    }
  3. Check rule order (first match wins)

    // Rules are evaluated in order - put specific rules before general ones
    Routes: []petalflow.RouteRule{
    {When: condition1, To: "specific_handler"}, // Check first
    {When: condition2, To: "general_handler"}, // Check second
    },

Symptoms: OutputKey is empty or nil after LLMNode execution.

Debugging:

// Add detailed LLM logging
handler := func(event petalflow.Event) {
if event.NodeID == "my_llm_node" {
switch event.Kind {
case petalflow.EventNodeStart:
log.Printf("LLM prompt: %s", event.Data["prompt"])
case petalflow.EventNodeEnd:
log.Printf("LLM response: %s", event.Data["response"])
log.Printf("Token usage: %v", event.Data["usage"])
case petalflow.EventNodeError:
log.Printf("LLM error: %v", event.Error)
}
}
}

Common Causes:

  • Template variables not found in envelope
  • Model returned empty content (check stop sequences)
  • API rate limiting or timeout

Symptoms: Expected envelope key is nil or missing.

Debugging:

// Trace envelope mutations
handler := func(event petalflow.Event) {
if event.Kind == petalflow.EventNodeEnd {
log.Printf("After %s, envelope keys: %v",
event.NodeID,
event.Data["envelope_keys"])
}
}
// Check specific key
func traceKey(key string) petalflow.EventHandler {
return func(event petalflow.Event) {
if event.Kind == petalflow.EventNodeEnd {
value := event.Data["envelope"].(map[string]any)[key]
log.Printf("After %s: %s = %v", event.NodeID, key, value)
}
}
}

Use sampling to reduce overhead in production:

type SampledHandler struct {
underlying petalflow.EventHandler
sampleRate float64
errorAlways bool
}
func (h *SampledHandler) Handler() petalflow.EventHandler {
return func(event petalflow.Event) {
// Always capture errors
if h.errorAlways && event.Kind == petalflow.EventNodeError {
h.underlying(event)
return
}
// Sample other events
if rand.Float64() < h.sampleRate {
h.underlying(event)
}
}
}
// Capture 1% of events, but all errors
sampled := &SampledHandler{
underlying: structuredLogHandler(logger),
sampleRate: 0.01,
errorAlways: true,
}

Enable detailed logging for specific requests:

func debuggableRun(ctx context.Context, runtime *petalflow.Runtime, graph petalflow.Graph, env *petalflow.Envelope) (*petalflow.Envelope, error) {
// Check for debug header/flag
debugEnabled := ctx.Value("debug_enabled").(bool)
opts := petalflow.RunOptions{}
if debugEnabled {
collector := &EventCollector{}
opts.EventHandler = collector.Handler()
defer func() {
// Store events for later retrieval
requestID := ctx.Value("request_id").(string)
storeDebugEvents(requestID, collector.Events())
}()
}
return runtime.Run(ctx, graph, env, opts)
}

Store enough state to replay failures in development:

type ReplayableRun struct {
GraphName string
InitialEnv map[string]any
Events []petalflow.Event
FinalEnv map[string]any
Error error
Timestamp time.Time
}
func captureForReplay(graphName string) petalflow.EventHandler {
run := &ReplayableRun{
GraphName: graphName,
Timestamp: time.Now(),
}
return func(event petalflow.Event) {
run.Events = append(run.Events, event)
if event.Kind == petalflow.EventGraphStart {
run.InitialEnv = event.Data["envelope"].(map[string]any)
}
if event.Kind == petalflow.EventGraphEnd {
run.FinalEnv = event.Data["envelope"].(map[string]any)
if event.Error != nil {
run.Error = event.Error
saveFailedRun(run) // Store for later analysis
}
}
}
}