Skip to content

Petal Flow Concepts

Petal Flow is built around four core primitives: Graph, Node, Envelope, and Runtime.

Graph

Directed node topology with entrypoint and edges.

Node

Unit of execution that transforms envelope state.

Envelope

Shared run state (Vars, Messages, Artifacts, Trace).

Runtime

Executes graph, emits events, applies step control.

A graph defines execution flow by connecting nodes with directed edges.

g := petalflow.NewGraph("support-flow")
_ = g.AddNode(validateNode)
_ = g.AddNode(routeNode)
_ = g.AddNode(handleNode)
_ = g.AddEdge("validate", "route")
_ = g.AddEdge("route", "handle")
_ = g.SetEntry("validate")

For fluent composition, use graph.NewGraphBuilder(...) (AddNode, Edge, FanOut, Merge, Conditional, Build).

Nodes implement a small interface:

type Node interface {
ID() string
Kind() NodeKind
Run(ctx context.Context, env *Envelope) (*Envelope, error)
}
  • LLM & Tools: LLMNode, ToolNode
  • Routing & Control: RuleRouter, LLMRouter, GateNode, MergeNode, MapNode, HumanNode
  • Data shaping: TransformNode, FilterNode, CacheNode
  • Validation & Safety: GuardianNode
  • Webhooks: WebhookTriggerNode, WebhookCallNode
type SentimentNode struct {
core.BaseNode
client *nlp.Client
}
func NewSentimentNode(id string, client *nlp.Client) *SentimentNode {
return &SentimentNode{
BaseNode: core.NewBaseNode(id, core.NodeKindTransform),
client: client,
}
}
func (n *SentimentNode) Run(ctx context.Context, env *core.Envelope) (*core.Envelope, error) {
text := env.GetVarString("input")
result, err := n.client.AnalyzeSentiment(ctx, text)
if err != nil {
return nil, err
}
env.SetVar("sentiment", result.Label)
env.SetVar("score", result.Score)
return env, nil
}

The envelope is the state container passed across nodes.

env := petalflow.NewEnvelope().
WithVar("ticket_id", "TKT-123").
WithVar("message", "I was charged twice")
// Read/write vars
v, ok := env.GetVar("ticket_id")
_ = ok
_ = v
env.SetVar("priority", "high")
// Message and artifact streams
env.AppendMessage(petalflow.Message{Role: "user", Content: "help"})
env.AppendArtifact(petalflow.Artifact{ID: "doc-1", Type: "citation", Text: "..."})

Use Clone() for branch-safe execution.

The runtime executes a graph and emits structured events.

opts := petalflow.DefaultRunOptions()
opts.Concurrency = 4
opts.EventHandler = func(e petalflow.Event) {
switch e.Kind {
case petalflow.EventRunStarted:
fmt.Println("run started")
case petalflow.EventNodeStarted:
fmt.Printf("node start: %s\n", e.NodeID)
case petalflow.EventNodeFinished:
fmt.Printf("node done: %s (%v)\n", e.NodeID, e.Elapsed)
case petalflow.EventNodeFailed:
fmt.Printf("node failed: %s (%v)\n", e.NodeID, e.Payload["error"])
case petalflow.EventRunFinished:
fmt.Printf("run finished: %v\n", e.Payload["status"])
}
}
result, err := petalflow.NewRuntime().Run(ctx, graph, env, opts)
  • Run lifecycle: run.started, run.finished
  • Node lifecycle: node.started, node.finished, node.failed
  • Routing/tools/streaming: route.decision, tool.call, tool.result, node.output.*
  • Step control: step.paused, step.resumed, step.skipped, step.aborted

Step controllers enable debug pauses and human-in-the-loop tooling.

ctrl := runtime.NewChannelStepController(10)
ctrl.SetBreakpoint("review")
opts := petalflow.DefaultRunOptions()
opts.StepController = ctrl
opts.StepConfig = runtime.DefaultStepConfig()
  • Authoring: Agent/Task schema or Graph IR
  • Loading/compilation: Agent/Task compiles to Graph IR
  • Hydration: Graph definitions become live nodes with providers/tool registry
  • Execution: Runtime runs graph with timeout, events, and optional persistence via daemon APIs