Graph
The structure that defines your workflow as connected nodes
PetalFlow is built around a small set of core abstractions: Graphs, Nodes, Envelopes, and Runtimes. Understanding these concepts is essential for building effective workflows.
Graph
The structure that defines your workflow as connected nodes
Node
Individual units of execution that process data
Envelope
The data carrier that flows between nodes
Runtime
The engine that executes graphs and emits events
A Graph is a directed acyclic (or cyclic) graph of nodes with a defined entry point. Graphs define the control flow and structure of your workflow.
┌─────────┐ ┌─────────┐ ┌─────────┐│ Entry │────▶│ Process │────▶│ Output │└─────────┘ └─────────┘ └─────────┘ │ ▼ ┌─────────┐ │ Branch │ └─────────┘// Create an empty graphg := petalflow.NewGraph("my-workflow")
// Add nodesg.AddNode(parseNode)g.AddNode(processNode)g.AddNode(outputNode)
// Connect nodes with edgesg.AddEdge("parse", "process")g.AddEdge("process", "output")
// Set the entry pointg.SetEntry("parse")| Property | Description |
|---|---|
| Name | Identifier for the graph (used in logging and events) |
| Entry | The first node to execute |
| Nodes | Collection of nodes in the graph |
| Edges | Connections between nodes |
For complex graphs, use the fluent builder:
graph, err := petalflow.BuildGraph("support-flow", petalflow.WithEntry("validate"), petalflow.WithNodes(validateNode, classifyNode, routerNode, handleNode), petalflow.WithEdges( petalflow.Edge("validate", "classify"), petalflow.Edge("classify", "router"), petalflow.Edge("router", "billing_handler"), petalflow.Edge("router", "tech_handler"), petalflow.Edge("router", "general_handler"), ),)A Node is a unit of execution within a graph. Each node performs a specific operation—calling an LLM, transforming data, making routing decisions, or validating inputs.
All nodes implement a simple interface:
type Node interface { ID() string Execute(ctx context.Context, env *Envelope) error}┌─────────────────────────────────────────────────────┐│ Node Execution │├─────────────────────────────────────────────────────┤│ 1. EventNodeStart emitted ││ 2. Node receives Envelope ││ 3. Node reads input from Envelope.Vars ││ 4. Node performs its operation ││ 5. Node writes output to Envelope.Vars ││ 6. EventNodeEnd emitted (or EventNodeError) ││ 7. Runtime determines next node(s) via edges │└─────────────────────────────────────────────────────┘| Category | Nodes | Purpose |
|---|---|---|
| Reasoning | LLMNode | Call language models |
| Integration | ToolNode | Execute external tools and APIs |
| Routing | RuleRouter, LLMRouter | Direct flow based on conditions |
| Processing | TransformNode, FilterNode | Transform and filter data |
| Control Flow | MergeNode, HumanNode | Manage parallel branches and approvals |
| Validation | GuardianNode | Enforce constraints and validate data |
| Optimization | CacheNode | Cache expensive operations |
Create custom nodes for domain-specific logic:
type SentimentAnalyzer struct { id string client *nlp.Client}
func NewSentimentAnalyzer(id string, client *nlp.Client) *SentimentAnalyzer { return &SentimentAnalyzer{id: id, client: client}}
func (n *SentimentAnalyzer) ID() string { return n.id}
func (n *SentimentAnalyzer) Execute(ctx context.Context, env *petalflow.Envelope) error { text := env.GetVarString("input_text")
result, err := n.client.AnalyzeSentiment(ctx, text) if err != nil { return fmt.Errorf("sentiment analysis failed: %w", err) }
env.SetVar("sentiment", result.Label) env.SetVar("sentiment_score", result.Score) return nil}The Envelope is the data carrier that flows between nodes. It contains all the state and context needed for workflow execution.
┌─────────────────────────────────────────────────────┐│ Envelope │├─────────────────────────────────────────────────────┤│ Vars map[string]any Structured data ││ Messages []Message Chat history ││ Artifacts []Artifact Documents/files ││ Trace TraceInfo Execution metadata │└─────────────────────────────────────────────────────┘The primary way to pass data between nodes:
env := petalflow.NewEnvelope()
// Set valuesenv.SetVar("user_id", "usr_123")env.SetVar("query", "How do I reset my password?")env.SetVar("metadata", map[string]any{ "source": "web", "timestamp": time.Now(),})
// Get valuesuserID := env.GetVarString("user_id") // Type-safe stringquery := env.GetVar("query") // any typemetadata := env.GetVar("metadata").(map[string]any)
// Check existenceif env.HasVar("optional_field") { // ...}
// Delete valuesenv.DeleteVar("temporary_data")For LLM-based workflows, messages maintain conversation context:
// Add messagesenv.AddMessage(petalflow.Message{ Role: "system", Content: "You are a helpful assistant.",})
env.AddMessage(petalflow.Message{ Role: "user", Content: "What's the weather like?",})
env.AddMessage(petalflow.Message{ Role: "assistant", Content: "I don't have access to real-time weather data...",})
// Get all messagesmessages := env.Messages()
// Clear messagesenv.ClearMessages()Store documents, images, or other binary content:
// Add an artifactenv.AddArtifact(petalflow.Artifact{ ID: "doc_001", Name: "user_manual.pdf", ContentType: "application/pdf", Content: pdfBytes, Metadata: map[string]any{ "pages": 42, "author": "Documentation Team", },})
// Get artifactsartifacts := env.Artifacts()doc := env.GetArtifact("doc_001")Track execution context for debugging and observability:
// Trace is automatically populated by the runtimetrace := env.Trace()
fmt.Printf("Request ID: %s\n", trace.RequestID)fmt.Printf("Started: %v\n", trace.StartTime)fmt.Printf("Current Node: %s\n", trace.CurrentNode)fmt.Printf("Visited Nodes: %v\n", trace.VisitedNodes)For parallel branches, clone envelopes to prevent data races:
// Clone creates a deep copyclonedEnv := env.Clone()
// Modifications to clonedEnv don't affect originalclonedEnv.SetVar("branch_specific", "value")Edges connect nodes and define how execution flows through the graph.
| Type | Description |
|---|---|
| Simple Edge | Always followed (linear flow) |
| Router Edge | Conditionally followed based on router decision |
| Merge Edge | Converges parallel branches |
// Simple edge: always follows this pathg.AddEdge("parse", "process")
// Multiple outgoing edges from a routerg.AddEdge("router", "handler_a")g.AddEdge("router", "handler_b")g.AddEdge("router", "handler_c")
// Merge edges: multiple sources to one targetg.AddEdge("branch_a", "merge")g.AddEdge("branch_b", "merge")g.AddEdge("branch_c", "merge")Edges can carry metadata for observability:
g.AddEdgeWithMeta("classify", "router", petalflow.EdgeMeta{ Label: "classification_complete", Description: "Routes based on intent classification",})The Runtime executes graphs, manages node execution, handles parallel branches, and emits events throughout the workflow lifecycle.
runtime := petalflow.NewRuntime()
env := petalflow.NewEnvelope()env.SetVar("input", "Hello, world!")
result, err := runtime.Run(ctx, graph, env, petalflow.RunOptions{})if err != nil { log.Fatalf("Workflow failed: %v", err)}
output := result.GetVarString("output")| Option | Description |
|---|---|
EventHandler | Callback for runtime events |
StepController | Control execution flow (debugging) |
Timeout | Maximum execution time |
MaxParallel | Limit concurrent branch execution |
opts := petalflow.RunOptions{ EventHandler: func(event petalflow.Event) { log.Printf("[%s] %s", event.Kind, event.NodeID) }, Timeout: 30 * time.Second, MaxParallel: 5,}
result, err := runtime.Run(ctx, graph, env, opts)The runtime follows these rules:
┌─────────┐ │ Entry │ └────┬────┘ │ ┌────▼────┐ │ Fan │ │ Out │ └────┬────┘ ┌──────────┼──────────┐ ▼ ▼ ▼ ┌────────┐ ┌────────┐ ┌────────┐ │Branch A│ │Branch B│ │Branch C│ (parallel) └────┬───┘ └────┬───┘ └────┬───┘ │ │ │ └──────────┼──────────┘ ┌────▼────┐ │ Merge │ (waits for all) └────┬────┘ │ ┌────▼────┐ │ Output │ └─────────┘The runtime emits Events throughout execution for observability and debugging.
| Event | When Emitted |
|---|---|
EventGraphStart | Graph execution begins |
EventGraphEnd | Graph execution completes |
EventNodeStart | Before node executes |
EventNodeEnd | After node completes successfully |
EventNodeError | When node returns an error |
EventRouteDecision | When router makes a decision |
EventBranchStart | Parallel branch begins |
EventBranchMerge | Parallel branches merge |
EventCacheHit | Cache returns stored value |
EventCacheMiss | Cache has no stored value |
type Event struct { Kind EventKind // Type of event NodeID string // Node that triggered it Timestamp time.Time // When it occurred Duration time.Duration // Execution time (End events) Data map[string]any // Event-specific data Error error // Error details (Error events)}handler := func(event petalflow.Event) { switch event.Kind { case petalflow.EventNodeStart: fmt.Printf("Starting: %s\n", event.NodeID) case petalflow.EventNodeEnd: fmt.Printf("Completed: %s (%v)\n", event.NodeID, event.Duration) case petalflow.EventNodeError: fmt.Printf("Failed: %s - %v\n", event.NodeID, event.Error) }}Step Controllers enable fine-grained control over execution for debugging and human-in-the-loop workflows.
Pause at specific nodes:
controller := petalflow.NewBreakpointStepController([]string{"review", "approval"})opts := petalflow.NewStepRunOptions(controller)
// Execution pauses at "review" and "approval" nodesruntime.Run(ctx, graph, env, opts)type StepController interface { BeforeNode(ctx context.Context, nodeID string, env *Envelope) error AfterNode(ctx context.Context, nodeID string, env *Envelope) error}Routers and Guards control flow and enforce constraints within graphs.
Routers direct execution to different nodes based on conditions:
router := petalflow.NewRuleRouter("priority", petalflow.RuleRouterConfig{ Routes: []petalflow.RouteRule{ {When: petalflow.RouteCondition{Var: "urgent", Op: petalflow.OpEquals, Value: true}, To: "fast_track"}, }, Default: "standard",})Guards validate data and enforce constraints:
guard := petalflow.NewGuardianNode("validate", petalflow.GuardianNodeConfig{ Checks: []petalflow.GuardCheck{ {Var: "email", Op: petalflow.OpNotEmpty, Message: "Email required"}, }, OnFail: petalflow.GuardActionReject,})Here’s how the concepts work together in a complete workflow:
// 1. Create nodesvalidateNode := petalflow.NewGuardianNode("validate", guardConfig)classifyNode := petalflow.NewLLMRouter("classify", client, routerConfig)billingHandler := petalflow.NewLLMNode("billing", client, billingConfig)techHandler := petalflow.NewLLMNode("tech", client, techConfig)formatNode := petalflow.NewTransformNode("format", formatConfig)
// 2. Build graphg := petalflow.NewGraph("support-workflow")g.AddNode(validateNode)g.AddNode(classifyNode)g.AddNode(billingHandler)g.AddNode(techHandler)g.AddNode(formatNode)
g.AddEdge("validate", "classify")g.AddEdge("classify", "billing")g.AddEdge("classify", "tech")g.AddEdge("billing", "format")g.AddEdge("tech", "format")g.SetEntry("validate")
// 3. Create envelope with inputenv := petalflow.NewEnvelope()env.SetVar("ticket_id", "TKT-12345")env.SetVar("message", "I was charged twice for my subscription")
// 4. Execute with runtimeruntime := petalflow.NewRuntime()result, err := runtime.Run(ctx, g, env, petalflow.RunOptions{ EventHandler: loggingHandler,})
// 5. Extract outputresponse := result.GetVarString("formatted_response")