Petal Flow Runtime API
runtime
Section titled “runtime”The runtime package provides the execution engine for PetalFlow workflow graphs.
import "github.com/petal-labs/petalflow/runtime"Or use the top-level re-exports:
import "github.com/petal-labs/petalflow"Type Reference
Section titled “Type Reference”| Type | Purpose |
|---|---|
Runtime | Interface for executing graphs |
BasicRuntime | Standard runtime implementation |
RunOptions | Runtime configuration options |
Event | Structured execution event |
EventKind | Event type identifier |
EventHandler | Function for handling events |
EventEmitter | Function for emitting events |
StepController | Interface for step-through debugging |
StepConfig | Step-through configuration |
StepRequest | Step point information |
StepResponse | Controller decision |
Runtime Interface
Section titled “Runtime Interface”The Runtime interface defines the contract for graph execution engines.
type Runtime interface { // Run executes the graph with the given initial envelope. Run(ctx context.Context, g graph.Graph, env *core.Envelope, opts RunOptions) (*core.Envelope, error)
// Events returns a channel for receiving runtime events. // The channel is closed when the run completes. Events() <-chan Event}BasicRuntime
Section titled “BasicRuntime”The standard implementation of the Runtime interface. Supports both sequential and parallel execution.
Constructor
Section titled “Constructor”func NewRuntime() *BasicRuntimeExample
Section titled “Example”rt := runtime.NewRuntime()
// Execute a graphresult, err := rt.Run(ctx, myGraph, envelope, runtime.DefaultRunOptions())if err != nil { log.Fatal(err)}
// Process resultfmt.Println("Output:", result.GetVarString("output"))Concurrent Execution
Section titled “Concurrent Execution”// Enable parallel branch executionopts := runtime.RunOptions{ MaxHops: 100, Concurrency: 4, // 4 parallel workers}
result, err := rt.Run(ctx, myGraph, envelope, opts)RunOptions
Section titled “RunOptions”Controls execution behavior for a graph run.
type RunOptions struct { // MaxHops protects against infinite cycles (default: 100). MaxHops int
// ContinueOnError records errors and continues when possible. ContinueOnError bool
// Concurrency sets the worker pool size for parallel execution (default: 1). Concurrency int
// Now provides the current time (for testing). If nil, uses time.Now. Now func() time.Time
// EventHandler receives events during execution. EventHandler EventHandler
// EventEmitterDecorator wraps the internal event emitter. EventEmitterDecorator EventEmitterDecorator
// StepController enables step-through debugging. StepController StepController
// StepConfig provides additional step-through configuration. StepConfig *StepConfig
// EventBus distributes events to subscribers. EventBus EventPublisher}Default Options
Section titled “Default Options”func DefaultRunOptions() RunOptionsReturns sensible defaults:
| Option | Default Value |
|---|---|
MaxHops | 100 |
ContinueOnError | false |
Concurrency | 1 |
Convenience Constructors
Section titled “Convenience Constructors”// Create options with a step controllerfunc NewStepRunOptions(controller StepController) RunOptions
// Create options with a custom time provider (for testing)func NewTimedRunOptions(now func() time.Time) RunOptionsExample
Section titled “Example”opts := runtime.RunOptions{ MaxHops: 200, ContinueOnError: true, Concurrency: 4, EventHandler: func(e runtime.Event) { log.Printf("[%s] %s: %s", e.Kind, e.NodeID, e.Elapsed) },}
result, err := rt.Run(ctx, graph, envelope, opts)Events
Section titled “Events”EventKind
Section titled “EventKind”Identifies the type of event emitted by the runtime.
type EventKind string
const ( // Run lifecycle EventRunStarted EventKind = "run.started" EventRunFinished EventKind = "run.finished" EventRunSnapshot EventKind = "run.snapshot"
// Node lifecycle EventNodeStarted EventKind = "node.started" EventNodeFinished EventKind = "node.finished" EventNodeFailed EventKind = "node.failed"
// Node output EventNodeOutput EventKind = "node.output" EventNodeOutputDelta EventKind = "node.output.delta" EventNodeOutputFinal EventKind = "node.output.final" EventNodeOutputPreview EventKind = "node.output.preview"
// Routing EventRouteDecision EventKind = "route.decision"
// Tools EventToolCall EventKind = "tool.call" EventToolResult EventKind = "tool.result"
// Step control EventStepPaused EventKind = "step.paused" EventStepResumed EventKind = "step.resumed" EventStepSkipped EventKind = "step.skipped" EventStepAborted EventKind = "step.aborted")Event Structure
Section titled “Event Structure”type Event struct { Kind EventKind // Event type identifier RunID string // Unique run identifier NodeID string // Node that produced this event (empty for run-level) NodeKind core.NodeKind // Kind of node (empty for run-level) Time time.Time // When the event occurred Attempt int // Attempt number (1-indexed) for retry scenarios Elapsed time.Duration // Duration since run or node started Payload map[string]any // Event-specific data Seq uint64 // Monotonic sequence number per run TraceID string // OpenTelemetry trace ID (hex-encoded) SpanID string // OpenTelemetry span ID (hex-encoded)}Event Constructor
Section titled “Event Constructor”func NewEvent(kind EventKind, runID string) EventFluent Methods
Section titled “Fluent Methods”| Method | Description |
|---|---|
WithNode(nodeID, nodeKind) Event | Sets node information |
WithAttempt(attempt) Event | Sets attempt number |
WithElapsed(elapsed) Event | Sets elapsed duration |
WithPayload(key, value) Event | Adds payload key-value pair |
Example
Section titled “Example”event := runtime.NewEvent(runtime.EventNodeFinished, runID). WithNode("process", core.NodeKindTransform). WithElapsed(150 * time.Millisecond). WithPayload("items_processed", 42)Event Handlers
Section titled “Event Handlers”EventHandler
Section titled “EventHandler”type EventHandler func(Event)EventEmitter
Section titled “EventEmitter”type EventEmitter func(Event)EventEmitterDecorator
Section titled “EventEmitterDecorator”Wraps an emitter to add cross-cutting behavior (e.g., trace metadata):
type EventEmitterDecorator func(EventEmitter) EventEmitterEventPublisher
Section titled “EventPublisher”Interface for distributing events to external subscribers:
type EventPublisher interface { Publish(event Event)}Helper Functions
Section titled “Helper Functions”// Combines multiple handlers into onefunc MultiEventHandler(handlers ...EventHandler) EventHandler
// Returns a handler that sends events to a channelfunc ChannelEventHandler(ch chan<- Event) EventHandlerExample
Section titled “Example”// Create a combined handlerhandler := runtime.MultiEventHandler( func(e runtime.Event) { log.Printf("Event: %s", e.Kind) }, func(e runtime.Event) { metrics.RecordEvent(e.Kind, e.Elapsed) },)
opts := runtime.RunOptions{ EventHandler: handler,}Step Controller
Section titled “Step Controller”The step controller system enables debugging and human-in-the-loop workflows.
StepController Interface
Section titled “StepController Interface”type StepController interface { // Step is called at each step point and blocks until a decision is made. Step(ctx context.Context, req *StepRequest) (*StepResponse, error)
// ShouldPause returns true if execution should pause at this point. ShouldPause(nodeID string, point StepPoint) bool}StepAction
Section titled “StepAction”type StepAction string
const ( StepActionContinue StepAction = "continue" // Proceed to next step StepActionSkipNode StepAction = "skip" // Skip current node StepActionAbort StepAction = "abort" // Stop execution StepActionRunToBreakpoint StepAction = "run_to_breakpoint" // Continue until breakpoint)StepPoint
Section titled “StepPoint”type StepPoint string
const ( StepPointBeforeNode StepPoint = "before_node" // Before node executes StepPointAfterNode StepPoint = "after_node" // After node executes)StepRequest
Section titled “StepRequest”Information provided to the controller at each step point:
type StepRequest struct { ID string // Unique step identifier RunID string // Workflow run identifier Point StepPoint // When this pause occurred NodeID string // ID of the node NodeKind core.NodeKind // Type of node Envelope *EnvelopeSnapshot // Read-only state snapshot HopCount int // Times this node has been visited Error error // Set if after-node and node failed Graph GraphSnapshot // Graph context for navigation CreatedAt time.Time // When step request was created}StepResponse
Section titled “StepResponse”Controller’s decision:
type StepResponse struct { RequestID string // Matches StepRequest.ID Action StepAction // What to do next ModifiedEnvelope *EnvelopeModification // Optional envelope changes Meta map[string]any // Optional debugging metadata}StepConfig
Section titled “StepConfig”type StepConfig struct { PauseBeforeNode bool // Pause before each node PauseAfterNode bool // Pause after each node StepTimeout time.Duration // Timeout for each step decision}
func DefaultStepConfig() *StepConfigDefault config:
| Option | Default |
|---|---|
PauseBeforeNode | true |
PauseAfterNode | false |
StepTimeout | 0 (no timeout) |
Step Controller Implementations
Section titled “Step Controller Implementations”CallbackStepController
Section titled “CallbackStepController”Simplest controller for custom integrations:
func NewCallbackStepController(callback StepCallback) *CallbackStepControllerctrl := runtime.NewCallbackStepController(func(ctx context.Context, req *runtime.StepRequest) (*runtime.StepResponse, error) { fmt.Printf("At node: %s (point: %s)\n", req.NodeID, req.Point)
// Continue execution return &runtime.StepResponse{ RequestID: req.ID, Action: runtime.StepActionContinue, }, nil})
opts := runtime.RunOptions{ StepController: ctrl,}ChannelStepController
Section titled “ChannelStepController”Uses Go channels for interactive debugging (useful for CLI tools and testing):
func NewChannelStepController(bufferSize int) *ChannelStepController| Method | Description |
|---|---|
Requests() <-chan *StepRequest | Channel for receiving step requests |
Respond(resp *StepResponse) error | Send response for pending request |
SetBreakpoint(nodeID string) | Add breakpoint on a node |
ClearBreakpoint(nodeID string) | Remove breakpoint from a node |
ClearAllBreakpoints() | Remove all breakpoints |
ListPending() []*StepRequest | List pending step requests |
ctrl := runtime.NewChannelStepController(10)
// Set breakpointsctrl.SetBreakpoint("validate")ctrl.SetBreakpoint("process")
// Run in goroutinego func() { result, err := rt.Run(ctx, graph, envelope, runtime.RunOptions{ StepController: ctrl, }) // Handle result}()
// Interactive debugging loopfor req := range ctrl.Requests() { fmt.Printf("Paused at %s (vars: %v)\n", req.NodeID, req.Envelope.Vars)
// Get user input and respond ctrl.Respond(&runtime.StepResponse{ RequestID: req.ID, Action: runtime.StepActionContinue, })}BreakpointStepController
Section titled “BreakpointStepController”Only pauses at specified breakpoints:
func NewBreakpointStepController(handler StepHandler) *BreakpointStepController| Method | Description |
|---|---|
AddBreakpoint(nodeID, point) | Add breakpoint at specific point |
AddBreakpointBefore(nodeID) | Add breakpoint before node |
AddBreakpointAfter(nodeID) | Add breakpoint after node |
RemoveBreakpoint(nodeID) | Remove breakpoint |
ClearAllBreakpoints() | Remove all breakpoints |
ListBreakpoints() map[string]StepPoint | List all breakpoints |
ctrl := runtime.NewBreakpointStepController(func(ctx context.Context, req *runtime.StepRequest) (*runtime.StepResponse, error) { // Inspect state at breakpoint log.Printf("Breakpoint hit: %s", req.NodeID)
return &runtime.StepResponse{ RequestID: req.ID, Action: runtime.StepActionContinue, }, nil})
// Add breakpointsctrl.AddBreakpointBefore("validate")ctrl.AddBreakpointAfter("process")AutoStepController
Section titled “AutoStepController”Automatically continues with configurable delay (useful for observing execution flow):
func NewAutoStepController(delay time.Duration) *AutoStepController| Method | Description |
|---|---|
WithLogHandler(fn func(*StepRequest)) | Set logging callback |
Pause() | Stop auto-stepping |
Resume() | Continue auto-stepping |
SetDelay(d time.Duration) | Change step delay |
SetBreakpoint(nodeID) | Add breakpoint |
ClearBreakpoint(nodeID) | Remove breakpoint |
ctrl := runtime.NewAutoStepController(500 * time.Millisecond). WithLogHandler(func(req *runtime.StepRequest) { fmt.Printf("Step: %s (hop %d)\n", req.NodeID, req.HopCount) })
// Run with automatic steppingresult, err := rt.Run(ctx, graph, envelope, runtime.RunOptions{ StepController: ctrl,})Envelope Modification
Section titled “Envelope Modification”Step controllers can modify the envelope at StepPointBeforeNode:
type EnvelopeModification struct { SetVars map[string]any // Variables to set or update DeleteVars []string // Variable names to remove}Example
Section titled “Example”ctrl := runtime.NewCallbackStepController(func(ctx context.Context, req *runtime.StepRequest) (*runtime.StepResponse, error) { // Modify envelope before node executes return &runtime.StepResponse{ RequestID: req.ID, Action: runtime.StepActionContinue, ModifiedEnvelope: &runtime.EnvelopeModification{ SetVars: map[string]any{ "debug_mode": true, "override_value": "test", }, DeleteVars: []string{"temporary_var"}, }, }, nil})Snapshot Types
Section titled “Snapshot Types”EnvelopeSnapshot
Section titled “EnvelopeSnapshot”Read-only view of envelope state:
type EnvelopeSnapshot struct { Input any Vars map[string]any Artifacts []core.Artifact Messages []core.Message Errors []core.NodeError Trace core.TraceInfo}GraphSnapshot
Section titled “GraphSnapshot”Read-only graph context:
type GraphSnapshot struct { Name string CurrentNode string Successors []string Predecessors []string AllNodes []string}Errors
Section titled “Errors”| Error | Cause |
|---|---|
ErrMaxHopsExceeded | Node executed more times than MaxHops allows |
ErrRunCanceled | Context was canceled during execution |
ErrNodeExecution | Node execution failed |
ErrStepAborted | Step controller aborted execution |
ErrStepRequestNotFound | Response sent for unknown step request |
Execution Patterns
Section titled “Execution Patterns”Basic Execution
Section titled “Basic Execution”rt := runtime.NewRuntime()
result, err := rt.Run(ctx, myGraph, envelope, runtime.DefaultRunOptions())if err != nil { log.Fatal(err)}With Event Logging
Section titled “With Event Logging”opts := runtime.RunOptions{ EventHandler: func(e runtime.Event) { switch e.Kind { case runtime.EventNodeStarted: log.Printf("Starting: %s", e.NodeID) case runtime.EventNodeFinished: log.Printf("Finished: %s (%v)", e.NodeID, e.Elapsed) case runtime.EventNodeFailed: log.Printf("Failed: %s - %v", e.NodeID, e.Payload["error"]) } },}
result, err := rt.Run(ctx, myGraph, envelope, opts)With Error Recovery
Section titled “With Error Recovery”opts := runtime.RunOptions{ ContinueOnError: true, EventHandler: func(e runtime.Event) { if e.Kind == runtime.EventNodeFailed { log.Printf("Node %s failed but continuing: %v", e.NodeID, e.Payload["error"]) } },}
result, err := rt.Run(ctx, myGraph, envelope, opts)
// Check for accumulated errorsif result.HasErrors() { for _, nodeErr := range result.Errors { log.Printf("Error in %s: %s", nodeErr.NodeID, nodeErr.Message) }}Parallel Execution
Section titled “Parallel Execution”opts := runtime.RunOptions{ Concurrency: 8, // Use 8 workers for parallel branches MaxHops: 200,}
result, err := rt.Run(ctx, parallelGraph, envelope, opts)Human-in-the-Loop
Section titled “Human-in-the-Loop”ctrl := runtime.NewChannelStepController(10)
// Run workflow in backgroundgo func() { result, err := rt.Run(ctx, workflowGraph, envelope, runtime.RunOptions{ StepController: ctrl, StepConfig: &runtime.StepConfig{ PauseBeforeNode: true, PauseAfterNode: false, }, }) // Handle completion}()
// Interactive approval loopfor req := range ctrl.Requests() { // Show state to user fmt.Printf("Node: %s\nVars: %v\n", req.NodeID, req.Envelope.Vars)
// Get approval if approved := getUserApproval(); approved { ctrl.Respond(&runtime.StepResponse{ RequestID: req.ID, Action: runtime.StepActionContinue, }) } else { ctrl.Respond(&runtime.StepResponse{ RequestID: req.ID, Action: runtime.StepActionAbort, }) }}Testing with Time Control
Section titled “Testing with Time Control”now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
opts := runtime.NewTimedRunOptions(func() time.Time { return now})
result, err := rt.Run(ctx, myGraph, envelope, opts)Best Practices
Section titled “Best Practices”| Practice | Recommendation |
|---|---|
| MaxHops | Set appropriately for your graph; use higher values for graphs with intentional cycles |
| ContinueOnError | Enable for fault-tolerant workflows; check result.Errors after completion |
| Concurrency | Match to parallel branch count; higher values increase parallelism |
| Events | Use events for observability; avoid heavy processing in handlers |
| Step control | Use BreakpointStepController for debugging; AutoStepController for demos |
| Envelope modification | Only modify at StepPointBeforeNode; keep changes minimal |