Skip to content

Petal Flow Runtime API

The runtime package provides the execution engine for PetalFlow workflow graphs.

import "github.com/petal-labs/petalflow/runtime"

Or use the top-level re-exports:

import "github.com/petal-labs/petalflow"

TypePurpose
RuntimeInterface for executing graphs
BasicRuntimeStandard runtime implementation
RunOptionsRuntime configuration options
EventStructured execution event
EventKindEvent type identifier
EventHandlerFunction for handling events
EventEmitterFunction for emitting events
StepControllerInterface for step-through debugging
StepConfigStep-through configuration
StepRequestStep point information
StepResponseController decision

The Runtime interface defines the contract for graph execution engines.

type Runtime interface {
// Run executes the graph with the given initial envelope.
Run(ctx context.Context, g graph.Graph, env *core.Envelope, opts RunOptions) (*core.Envelope, error)
// Events returns a channel for receiving runtime events.
// The channel is closed when the run completes.
Events() <-chan Event
}

The standard implementation of the Runtime interface. Supports both sequential and parallel execution.

func NewRuntime() *BasicRuntime
rt := runtime.NewRuntime()
// Execute a graph
result, err := rt.Run(ctx, myGraph, envelope, runtime.DefaultRunOptions())
if err != nil {
log.Fatal(err)
}
// Process result
fmt.Println("Output:", result.GetVarString("output"))
// Enable parallel branch execution
opts := runtime.RunOptions{
MaxHops: 100,
Concurrency: 4, // 4 parallel workers
}
result, err := rt.Run(ctx, myGraph, envelope, opts)

Controls execution behavior for a graph run.

type RunOptions struct {
// MaxHops protects against infinite cycles (default: 100).
MaxHops int
// ContinueOnError records errors and continues when possible.
ContinueOnError bool
// Concurrency sets the worker pool size for parallel execution (default: 1).
Concurrency int
// Now provides the current time (for testing). If nil, uses time.Now.
Now func() time.Time
// EventHandler receives events during execution.
EventHandler EventHandler
// EventEmitterDecorator wraps the internal event emitter.
EventEmitterDecorator EventEmitterDecorator
// StepController enables step-through debugging.
StepController StepController
// StepConfig provides additional step-through configuration.
StepConfig *StepConfig
// EventBus distributes events to subscribers.
EventBus EventPublisher
}
func DefaultRunOptions() RunOptions

Returns sensible defaults:

OptionDefault Value
MaxHops100
ContinueOnErrorfalse
Concurrency1
// Create options with a step controller
func NewStepRunOptions(controller StepController) RunOptions
// Create options with a custom time provider (for testing)
func NewTimedRunOptions(now func() time.Time) RunOptions
opts := runtime.RunOptions{
MaxHops: 200,
ContinueOnError: true,
Concurrency: 4,
EventHandler: func(e runtime.Event) {
log.Printf("[%s] %s: %s", e.Kind, e.NodeID, e.Elapsed)
},
}
result, err := rt.Run(ctx, graph, envelope, opts)

Identifies the type of event emitted by the runtime.

type EventKind string
const (
// Run lifecycle
EventRunStarted EventKind = "run.started"
EventRunFinished EventKind = "run.finished"
EventRunSnapshot EventKind = "run.snapshot"
// Node lifecycle
EventNodeStarted EventKind = "node.started"
EventNodeFinished EventKind = "node.finished"
EventNodeFailed EventKind = "node.failed"
// Node output
EventNodeOutput EventKind = "node.output"
EventNodeOutputDelta EventKind = "node.output.delta"
EventNodeOutputFinal EventKind = "node.output.final"
EventNodeOutputPreview EventKind = "node.output.preview"
// Routing
EventRouteDecision EventKind = "route.decision"
// Tools
EventToolCall EventKind = "tool.call"
EventToolResult EventKind = "tool.result"
// Step control
EventStepPaused EventKind = "step.paused"
EventStepResumed EventKind = "step.resumed"
EventStepSkipped EventKind = "step.skipped"
EventStepAborted EventKind = "step.aborted"
)
type Event struct {
Kind EventKind // Event type identifier
RunID string // Unique run identifier
NodeID string // Node that produced this event (empty for run-level)
NodeKind core.NodeKind // Kind of node (empty for run-level)
Time time.Time // When the event occurred
Attempt int // Attempt number (1-indexed) for retry scenarios
Elapsed time.Duration // Duration since run or node started
Payload map[string]any // Event-specific data
Seq uint64 // Monotonic sequence number per run
TraceID string // OpenTelemetry trace ID (hex-encoded)
SpanID string // OpenTelemetry span ID (hex-encoded)
}
func NewEvent(kind EventKind, runID string) Event
MethodDescription
WithNode(nodeID, nodeKind) EventSets node information
WithAttempt(attempt) EventSets attempt number
WithElapsed(elapsed) EventSets elapsed duration
WithPayload(key, value) EventAdds payload key-value pair
event := runtime.NewEvent(runtime.EventNodeFinished, runID).
WithNode("process", core.NodeKindTransform).
WithElapsed(150 * time.Millisecond).
WithPayload("items_processed", 42)

type EventHandler func(Event)
type EventEmitter func(Event)

Wraps an emitter to add cross-cutting behavior (e.g., trace metadata):

type EventEmitterDecorator func(EventEmitter) EventEmitter

Interface for distributing events to external subscribers:

type EventPublisher interface {
Publish(event Event)
}
// Combines multiple handlers into one
func MultiEventHandler(handlers ...EventHandler) EventHandler
// Returns a handler that sends events to a channel
func ChannelEventHandler(ch chan<- Event) EventHandler
// Create a combined handler
handler := runtime.MultiEventHandler(
func(e runtime.Event) {
log.Printf("Event: %s", e.Kind)
},
func(e runtime.Event) {
metrics.RecordEvent(e.Kind, e.Elapsed)
},
)
opts := runtime.RunOptions{
EventHandler: handler,
}

The step controller system enables debugging and human-in-the-loop workflows.

type StepController interface {
// Step is called at each step point and blocks until a decision is made.
Step(ctx context.Context, req *StepRequest) (*StepResponse, error)
// ShouldPause returns true if execution should pause at this point.
ShouldPause(nodeID string, point StepPoint) bool
}
type StepAction string
const (
StepActionContinue StepAction = "continue" // Proceed to next step
StepActionSkipNode StepAction = "skip" // Skip current node
StepActionAbort StepAction = "abort" // Stop execution
StepActionRunToBreakpoint StepAction = "run_to_breakpoint" // Continue until breakpoint
)
type StepPoint string
const (
StepPointBeforeNode StepPoint = "before_node" // Before node executes
StepPointAfterNode StepPoint = "after_node" // After node executes
)

Information provided to the controller at each step point:

type StepRequest struct {
ID string // Unique step identifier
RunID string // Workflow run identifier
Point StepPoint // When this pause occurred
NodeID string // ID of the node
NodeKind core.NodeKind // Type of node
Envelope *EnvelopeSnapshot // Read-only state snapshot
HopCount int // Times this node has been visited
Error error // Set if after-node and node failed
Graph GraphSnapshot // Graph context for navigation
CreatedAt time.Time // When step request was created
}

Controller’s decision:

type StepResponse struct {
RequestID string // Matches StepRequest.ID
Action StepAction // What to do next
ModifiedEnvelope *EnvelopeModification // Optional envelope changes
Meta map[string]any // Optional debugging metadata
}
type StepConfig struct {
PauseBeforeNode bool // Pause before each node
PauseAfterNode bool // Pause after each node
StepTimeout time.Duration // Timeout for each step decision
}
func DefaultStepConfig() *StepConfig

Default config:

OptionDefault
PauseBeforeNodetrue
PauseAfterNodefalse
StepTimeout0 (no timeout)

Simplest controller for custom integrations:

func NewCallbackStepController(callback StepCallback) *CallbackStepController
ctrl := runtime.NewCallbackStepController(func(ctx context.Context, req *runtime.StepRequest) (*runtime.StepResponse, error) {
fmt.Printf("At node: %s (point: %s)\n", req.NodeID, req.Point)
// Continue execution
return &runtime.StepResponse{
RequestID: req.ID,
Action: runtime.StepActionContinue,
}, nil
})
opts := runtime.RunOptions{
StepController: ctrl,
}

Uses Go channels for interactive debugging (useful for CLI tools and testing):

func NewChannelStepController(bufferSize int) *ChannelStepController
MethodDescription
Requests() <-chan *StepRequestChannel for receiving step requests
Respond(resp *StepResponse) errorSend response for pending request
SetBreakpoint(nodeID string)Add breakpoint on a node
ClearBreakpoint(nodeID string)Remove breakpoint from a node
ClearAllBreakpoints()Remove all breakpoints
ListPending() []*StepRequestList pending step requests
ctrl := runtime.NewChannelStepController(10)
// Set breakpoints
ctrl.SetBreakpoint("validate")
ctrl.SetBreakpoint("process")
// Run in goroutine
go func() {
result, err := rt.Run(ctx, graph, envelope, runtime.RunOptions{
StepController: ctrl,
})
// Handle result
}()
// Interactive debugging loop
for req := range ctrl.Requests() {
fmt.Printf("Paused at %s (vars: %v)\n", req.NodeID, req.Envelope.Vars)
// Get user input and respond
ctrl.Respond(&runtime.StepResponse{
RequestID: req.ID,
Action: runtime.StepActionContinue,
})
}

Only pauses at specified breakpoints:

func NewBreakpointStepController(handler StepHandler) *BreakpointStepController
MethodDescription
AddBreakpoint(nodeID, point)Add breakpoint at specific point
AddBreakpointBefore(nodeID)Add breakpoint before node
AddBreakpointAfter(nodeID)Add breakpoint after node
RemoveBreakpoint(nodeID)Remove breakpoint
ClearAllBreakpoints()Remove all breakpoints
ListBreakpoints() map[string]StepPointList all breakpoints
ctrl := runtime.NewBreakpointStepController(func(ctx context.Context, req *runtime.StepRequest) (*runtime.StepResponse, error) {
// Inspect state at breakpoint
log.Printf("Breakpoint hit: %s", req.NodeID)
return &runtime.StepResponse{
RequestID: req.ID,
Action: runtime.StepActionContinue,
}, nil
})
// Add breakpoints
ctrl.AddBreakpointBefore("validate")
ctrl.AddBreakpointAfter("process")

Automatically continues with configurable delay (useful for observing execution flow):

func NewAutoStepController(delay time.Duration) *AutoStepController
MethodDescription
WithLogHandler(fn func(*StepRequest))Set logging callback
Pause()Stop auto-stepping
Resume()Continue auto-stepping
SetDelay(d time.Duration)Change step delay
SetBreakpoint(nodeID)Add breakpoint
ClearBreakpoint(nodeID)Remove breakpoint
ctrl := runtime.NewAutoStepController(500 * time.Millisecond).
WithLogHandler(func(req *runtime.StepRequest) {
fmt.Printf("Step: %s (hop %d)\n", req.NodeID, req.HopCount)
})
// Run with automatic stepping
result, err := rt.Run(ctx, graph, envelope, runtime.RunOptions{
StepController: ctrl,
})

Step controllers can modify the envelope at StepPointBeforeNode:

type EnvelopeModification struct {
SetVars map[string]any // Variables to set or update
DeleteVars []string // Variable names to remove
}
ctrl := runtime.NewCallbackStepController(func(ctx context.Context, req *runtime.StepRequest) (*runtime.StepResponse, error) {
// Modify envelope before node executes
return &runtime.StepResponse{
RequestID: req.ID,
Action: runtime.StepActionContinue,
ModifiedEnvelope: &runtime.EnvelopeModification{
SetVars: map[string]any{
"debug_mode": true,
"override_value": "test",
},
DeleteVars: []string{"temporary_var"},
},
}, nil
})

Read-only view of envelope state:

type EnvelopeSnapshot struct {
Input any
Vars map[string]any
Artifacts []core.Artifact
Messages []core.Message
Errors []core.NodeError
Trace core.TraceInfo
}

Read-only graph context:

type GraphSnapshot struct {
Name string
CurrentNode string
Successors []string
Predecessors []string
AllNodes []string
}

ErrorCause
ErrMaxHopsExceededNode executed more times than MaxHops allows
ErrRunCanceledContext was canceled during execution
ErrNodeExecutionNode execution failed
ErrStepAbortedStep controller aborted execution
ErrStepRequestNotFoundResponse sent for unknown step request

rt := runtime.NewRuntime()
result, err := rt.Run(ctx, myGraph, envelope, runtime.DefaultRunOptions())
if err != nil {
log.Fatal(err)
}
opts := runtime.RunOptions{
EventHandler: func(e runtime.Event) {
switch e.Kind {
case runtime.EventNodeStarted:
log.Printf("Starting: %s", e.NodeID)
case runtime.EventNodeFinished:
log.Printf("Finished: %s (%v)", e.NodeID, e.Elapsed)
case runtime.EventNodeFailed:
log.Printf("Failed: %s - %v", e.NodeID, e.Payload["error"])
}
},
}
result, err := rt.Run(ctx, myGraph, envelope, opts)
opts := runtime.RunOptions{
ContinueOnError: true,
EventHandler: func(e runtime.Event) {
if e.Kind == runtime.EventNodeFailed {
log.Printf("Node %s failed but continuing: %v", e.NodeID, e.Payload["error"])
}
},
}
result, err := rt.Run(ctx, myGraph, envelope, opts)
// Check for accumulated errors
if result.HasErrors() {
for _, nodeErr := range result.Errors {
log.Printf("Error in %s: %s", nodeErr.NodeID, nodeErr.Message)
}
}
opts := runtime.RunOptions{
Concurrency: 8, // Use 8 workers for parallel branches
MaxHops: 200,
}
result, err := rt.Run(ctx, parallelGraph, envelope, opts)
ctrl := runtime.NewChannelStepController(10)
// Run workflow in background
go func() {
result, err := rt.Run(ctx, workflowGraph, envelope, runtime.RunOptions{
StepController: ctrl,
StepConfig: &runtime.StepConfig{
PauseBeforeNode: true,
PauseAfterNode: false,
},
})
// Handle completion
}()
// Interactive approval loop
for req := range ctrl.Requests() {
// Show state to user
fmt.Printf("Node: %s\nVars: %v\n", req.NodeID, req.Envelope.Vars)
// Get approval
if approved := getUserApproval(); approved {
ctrl.Respond(&runtime.StepResponse{
RequestID: req.ID,
Action: runtime.StepActionContinue,
})
} else {
ctrl.Respond(&runtime.StepResponse{
RequestID: req.ID,
Action: runtime.StepActionAbort,
})
}
}
now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
opts := runtime.NewTimedRunOptions(func() time.Time {
return now
})
result, err := rt.Run(ctx, myGraph, envelope, opts)

PracticeRecommendation
MaxHopsSet appropriately for your graph; use higher values for graphs with intentional cycles
ContinueOnErrorEnable for fault-tolerant workflows; check result.Errors after completion
ConcurrencyMatch to parallel branch count; higher values increase parallelism
EventsUse events for observability; avoid heavy processing in handlers
Step controlUse BreakpointStepController for debugging; AutoStepController for demos
Envelope modificationOnly modify at StepPointBeforeNode; keep changes minimal