Skip to content

Petal Flow Concepts

PetalFlow is built around a small set of core abstractions: Graphs, Nodes, Envelopes, and Runtimes. Understanding these concepts is essential for building effective workflows.

Graph

The structure that defines your workflow as connected nodes

Node

Individual units of execution that process data

Envelope

The data carrier that flows between nodes

Runtime

The engine that executes graphs and emits events


A Graph is a directed acyclic (or cyclic) graph of nodes with a defined entry point. Graphs define the control flow and structure of your workflow.

┌─────────┐ ┌─────────┐ ┌─────────┐
│ Entry │────▶│ Process │────▶│ Output │
└─────────┘ └─────────┘ └─────────┘
┌─────────┐
│ Branch │
└─────────┘
// Create an empty graph
g := petalflow.NewGraph("my-workflow")
// Add nodes
g.AddNode(parseNode)
g.AddNode(processNode)
g.AddNode(outputNode)
// Connect nodes with edges
g.AddEdge("parse", "process")
g.AddEdge("process", "output")
// Set the entry point
g.SetEntry("parse")
PropertyDescription
NameIdentifier for the graph (used in logging and events)
EntryThe first node to execute
NodesCollection of nodes in the graph
EdgesConnections between nodes

For complex graphs, use the fluent builder:

graph, err := petalflow.BuildGraph("support-flow",
petalflow.WithEntry("validate"),
petalflow.WithNodes(validateNode, classifyNode, routerNode, handleNode),
petalflow.WithEdges(
petalflow.Edge("validate", "classify"),
petalflow.Edge("classify", "router"),
petalflow.Edge("router", "billing_handler"),
petalflow.Edge("router", "tech_handler"),
petalflow.Edge("router", "general_handler"),
),
)

A Node is a unit of execution within a graph. Each node performs a specific operation—calling an LLM, transforming data, making routing decisions, or validating inputs.

All nodes implement a simple interface:

type Node interface {
ID() string
Execute(ctx context.Context, env *Envelope) error
}
┌─────────────────────────────────────────────────────┐
│ Node Execution │
├─────────────────────────────────────────────────────┤
│ 1. EventNodeStart emitted │
│ 2. Node receives Envelope │
│ 3. Node reads input from Envelope.Vars │
│ 4. Node performs its operation │
│ 5. Node writes output to Envelope.Vars │
│ 6. EventNodeEnd emitted (or EventNodeError) │
│ 7. Runtime determines next node(s) via edges │
└─────────────────────────────────────────────────────┘
CategoryNodesPurpose
ReasoningLLMNodeCall language models
IntegrationToolNodeExecute external tools and APIs
RoutingRuleRouter, LLMRouterDirect flow based on conditions
ProcessingTransformNode, FilterNodeTransform and filter data
Control FlowMergeNode, HumanNodeManage parallel branches and approvals
ValidationGuardianNodeEnforce constraints and validate data
OptimizationCacheNodeCache expensive operations

Create custom nodes for domain-specific logic:

type SentimentAnalyzer struct {
id string
client *nlp.Client
}
func NewSentimentAnalyzer(id string, client *nlp.Client) *SentimentAnalyzer {
return &SentimentAnalyzer{id: id, client: client}
}
func (n *SentimentAnalyzer) ID() string {
return n.id
}
func (n *SentimentAnalyzer) Execute(ctx context.Context, env *petalflow.Envelope) error {
text := env.GetVarString("input_text")
result, err := n.client.AnalyzeSentiment(ctx, text)
if err != nil {
return fmt.Errorf("sentiment analysis failed: %w", err)
}
env.SetVar("sentiment", result.Label)
env.SetVar("sentiment_score", result.Score)
return nil
}

The Envelope is the data carrier that flows between nodes. It contains all the state and context needed for workflow execution.

┌─────────────────────────────────────────────────────┐
│ Envelope │
├─────────────────────────────────────────────────────┤
│ Vars map[string]any Structured data │
│ Messages []Message Chat history │
│ Artifacts []Artifact Documents/files │
│ Trace TraceInfo Execution metadata │
└─────────────────────────────────────────────────────┘

The primary way to pass data between nodes:

env := petalflow.NewEnvelope()
// Set values
env.SetVar("user_id", "usr_123")
env.SetVar("query", "How do I reset my password?")
env.SetVar("metadata", map[string]any{
"source": "web",
"timestamp": time.Now(),
})
// Get values
userID := env.GetVarString("user_id") // Type-safe string
query := env.GetVar("query") // any type
metadata := env.GetVar("metadata").(map[string]any)
// Check existence
if env.HasVar("optional_field") {
// ...
}
// Delete values
env.DeleteVar("temporary_data")

For LLM-based workflows, messages maintain conversation context:

// Add messages
env.AddMessage(petalflow.Message{
Role: "system",
Content: "You are a helpful assistant.",
})
env.AddMessage(petalflow.Message{
Role: "user",
Content: "What's the weather like?",
})
env.AddMessage(petalflow.Message{
Role: "assistant",
Content: "I don't have access to real-time weather data...",
})
// Get all messages
messages := env.Messages()
// Clear messages
env.ClearMessages()

Store documents, images, or other binary content:

// Add an artifact
env.AddArtifact(petalflow.Artifact{
ID: "doc_001",
Name: "user_manual.pdf",
ContentType: "application/pdf",
Content: pdfBytes,
Metadata: map[string]any{
"pages": 42,
"author": "Documentation Team",
},
})
// Get artifacts
artifacts := env.Artifacts()
doc := env.GetArtifact("doc_001")

Track execution context for debugging and observability:

// Trace is automatically populated by the runtime
trace := env.Trace()
fmt.Printf("Request ID: %s\n", trace.RequestID)
fmt.Printf("Started: %v\n", trace.StartTime)
fmt.Printf("Current Node: %s\n", trace.CurrentNode)
fmt.Printf("Visited Nodes: %v\n", trace.VisitedNodes)

For parallel branches, clone envelopes to prevent data races:

// Clone creates a deep copy
clonedEnv := env.Clone()
// Modifications to clonedEnv don't affect original
clonedEnv.SetVar("branch_specific", "value")

Edges connect nodes and define how execution flows through the graph.

TypeDescription
Simple EdgeAlways followed (linear flow)
Router EdgeConditionally followed based on router decision
Merge EdgeConverges parallel branches
// Simple edge: always follows this path
g.AddEdge("parse", "process")
// Multiple outgoing edges from a router
g.AddEdge("router", "handler_a")
g.AddEdge("router", "handler_b")
g.AddEdge("router", "handler_c")
// Merge edges: multiple sources to one target
g.AddEdge("branch_a", "merge")
g.AddEdge("branch_b", "merge")
g.AddEdge("branch_c", "merge")

Edges can carry metadata for observability:

g.AddEdgeWithMeta("classify", "router", petalflow.EdgeMeta{
Label: "classification_complete",
Description: "Routes based on intent classification",
})

The Runtime executes graphs, manages node execution, handles parallel branches, and emits events throughout the workflow lifecycle.

runtime := petalflow.NewRuntime()
env := petalflow.NewEnvelope()
env.SetVar("input", "Hello, world!")
result, err := runtime.Run(ctx, graph, env, petalflow.RunOptions{})
if err != nil {
log.Fatalf("Workflow failed: %v", err)
}
output := result.GetVarString("output")
OptionDescription
EventHandlerCallback for runtime events
StepControllerControl execution flow (debugging)
TimeoutMaximum execution time
MaxParallelLimit concurrent branch execution
opts := petalflow.RunOptions{
EventHandler: func(event petalflow.Event) {
log.Printf("[%s] %s", event.Kind, event.NodeID)
},
Timeout: 30 * time.Second,
MaxParallel: 5,
}
result, err := runtime.Run(ctx, graph, env, opts)

The runtime follows these rules:

  1. Entry First: Execution starts at the graph’s entry node
  2. Sequential by Default: Nodes execute in edge-defined order
  3. Parallel Branches: Fan-out edges execute concurrently
  4. Merge Waits: MergeNode waits for all incoming branches
  5. Router Decides: Router nodes determine which edge to follow
  6. Errors Halt: Node errors stop execution (unless handled)
┌─────────┐
│ Entry │
└────┬────┘
┌────▼────┐
│ Fan │
│ Out │
└────┬────┘
┌──────────┼──────────┐
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐
│Branch A│ │Branch B│ │Branch C│ (parallel)
└────┬───┘ └────┬───┘ └────┬───┘
│ │ │
└──────────┼──────────┘
┌────▼────┐
│ Merge │ (waits for all)
└────┬────┘
┌────▼────┐
│ Output │
└─────────┘

The runtime emits Events throughout execution for observability and debugging.

EventWhen Emitted
EventGraphStartGraph execution begins
EventGraphEndGraph execution completes
EventNodeStartBefore node executes
EventNodeEndAfter node completes successfully
EventNodeErrorWhen node returns an error
EventRouteDecisionWhen router makes a decision
EventBranchStartParallel branch begins
EventBranchMergeParallel branches merge
EventCacheHitCache returns stored value
EventCacheMissCache has no stored value
type Event struct {
Kind EventKind // Type of event
NodeID string // Node that triggered it
Timestamp time.Time // When it occurred
Duration time.Duration // Execution time (End events)
Data map[string]any // Event-specific data
Error error // Error details (Error events)
}
handler := func(event petalflow.Event) {
switch event.Kind {
case petalflow.EventNodeStart:
fmt.Printf("Starting: %s\n", event.NodeID)
case petalflow.EventNodeEnd:
fmt.Printf("Completed: %s (%v)\n", event.NodeID, event.Duration)
case petalflow.EventNodeError:
fmt.Printf("Failed: %s - %v\n", event.NodeID, event.Error)
}
}

Step Controllers enable fine-grained control over execution for debugging and human-in-the-loop workflows.

Pause at specific nodes:

controller := petalflow.NewBreakpointStepController([]string{"review", "approval"})
opts := petalflow.NewStepRunOptions(controller)
// Execution pauses at "review" and "approval" nodes
runtime.Run(ctx, graph, env, opts)
type StepController interface {
BeforeNode(ctx context.Context, nodeID string, env *Envelope) error
AfterNode(ctx context.Context, nodeID string, env *Envelope) error
}

Routers and Guards control flow and enforce constraints within graphs.

Routers direct execution to different nodes based on conditions:

  • RuleRouter: Deterministic routing based on envelope values
  • LLMRouter: Semantic routing using LLM classification
router := petalflow.NewRuleRouter("priority", petalflow.RuleRouterConfig{
Routes: []petalflow.RouteRule{
{When: petalflow.RouteCondition{Var: "urgent", Op: petalflow.OpEquals, Value: true}, To: "fast_track"},
},
Default: "standard",
})

Guards validate data and enforce constraints:

guard := petalflow.NewGuardianNode("validate", petalflow.GuardianNodeConfig{
Checks: []petalflow.GuardCheck{
{Var: "email", Op: petalflow.OpNotEmpty, Message: "Email required"},
},
OnFail: petalflow.GuardActionReject,
})

Here’s how the concepts work together in a complete workflow:

// 1. Create nodes
validateNode := petalflow.NewGuardianNode("validate", guardConfig)
classifyNode := petalflow.NewLLMRouter("classify", client, routerConfig)
billingHandler := petalflow.NewLLMNode("billing", client, billingConfig)
techHandler := petalflow.NewLLMNode("tech", client, techConfig)
formatNode := petalflow.NewTransformNode("format", formatConfig)
// 2. Build graph
g := petalflow.NewGraph("support-workflow")
g.AddNode(validateNode)
g.AddNode(classifyNode)
g.AddNode(billingHandler)
g.AddNode(techHandler)
g.AddNode(formatNode)
g.AddEdge("validate", "classify")
g.AddEdge("classify", "billing")
g.AddEdge("classify", "tech")
g.AddEdge("billing", "format")
g.AddEdge("tech", "format")
g.SetEntry("validate")
// 3. Create envelope with input
env := petalflow.NewEnvelope()
env.SetVar("ticket_id", "TKT-12345")
env.SetVar("message", "I was charged twice for my subscription")
// 4. Execute with runtime
runtime := petalflow.NewRuntime()
result, err := runtime.Run(ctx, g, env, petalflow.RunOptions{
EventHandler: loggingHandler,
})
// 5. Extract output
response := result.GetVarString("formatted_response")