Graph
Directed node topology with entrypoint and edges.
Petal Flow is built around four core primitives: Graph, Node, Envelope, and Runtime.
Graph
Directed node topology with entrypoint and edges.
Node
Unit of execution that transforms envelope state.
Envelope
Shared run state (Vars, Messages, Artifacts, Trace).
Runtime
Executes graph, emits events, applies step control.
A graph defines execution flow by connecting nodes with directed edges.
g := petalflow.NewGraph("support-flow")_ = g.AddNode(validateNode)_ = g.AddNode(routeNode)_ = g.AddNode(handleNode)
_ = g.AddEdge("validate", "route")_ = g.AddEdge("route", "handle")_ = g.SetEntry("validate")For fluent composition, use graph.NewGraphBuilder(...) (AddNode, Edge, FanOut, Merge, Conditional, Build).
Nodes implement a small interface:
type Node interface { ID() string Kind() NodeKind Run(ctx context.Context, env *Envelope) (*Envelope, error)}LLMNode, ToolNodeRuleRouter, LLMRouter, GateNode, MergeNode, MapNode, HumanNodeTransformNode, FilterNode, CacheNodeGuardianNodeWebhookTriggerNode, WebhookCallNodetype SentimentNode struct { core.BaseNode client *nlp.Client}
func NewSentimentNode(id string, client *nlp.Client) *SentimentNode { return &SentimentNode{ BaseNode: core.NewBaseNode(id, core.NodeKindTransform), client: client, }}
func (n *SentimentNode) Run(ctx context.Context, env *core.Envelope) (*core.Envelope, error) { text := env.GetVarString("input") result, err := n.client.AnalyzeSentiment(ctx, text) if err != nil { return nil, err } env.SetVar("sentiment", result.Label) env.SetVar("score", result.Score) return env, nil}The envelope is the state container passed across nodes.
env := petalflow.NewEnvelope(). WithVar("ticket_id", "TKT-123"). WithVar("message", "I was charged twice")
// Read/write varsv, ok := env.GetVar("ticket_id")_ = ok_ = venv.SetVar("priority", "high")
// Message and artifact streamsenv.AppendMessage(petalflow.Message{Role: "user", Content: "help"})env.AppendArtifact(petalflow.Artifact{ID: "doc-1", Type: "citation", Text: "..."})Use Clone() for branch-safe execution.
The runtime executes a graph and emits structured events.
opts := petalflow.DefaultRunOptions()opts.Concurrency = 4opts.EventHandler = func(e petalflow.Event) { switch e.Kind { case petalflow.EventRunStarted: fmt.Println("run started") case petalflow.EventNodeStarted: fmt.Printf("node start: %s\n", e.NodeID) case petalflow.EventNodeFinished: fmt.Printf("node done: %s (%v)\n", e.NodeID, e.Elapsed) case petalflow.EventNodeFailed: fmt.Printf("node failed: %s (%v)\n", e.NodeID, e.Payload["error"]) case petalflow.EventRunFinished: fmt.Printf("run finished: %v\n", e.Payload["status"]) }}
result, err := petalflow.NewRuntime().Run(ctx, graph, env, opts)run.started, run.finishednode.started, node.finished, node.failedroute.decision, tool.call, tool.result, node.output.*step.paused, step.resumed, step.skipped, step.abortedStep controllers enable debug pauses and human-in-the-loop tooling.
ctrl := runtime.NewChannelStepController(10)ctrl.SetBreakpoint("review")
opts := petalflow.DefaultRunOptions()opts.StepController = ctrlopts.StepConfig = runtime.DefaultStepConfig()