Petal Flow Graph API
The graph package provides the directed graph model for PetalFlow workflows.
import "github.com/petal-labs/petalflow/graph"Or use the top-level re-exports:
import "github.com/petal-labs/petalflow"Type Reference
Section titled “Type Reference”| Type | Purpose |
|---|---|
Graph | Graph interface for workflow structure |
BasicGraph | Standard graph implementation |
Edge | Directed connection between nodes |
GraphBuilder | Fluent API for constructing graphs |
FanOutBranch | Helper for parallel branch patterns |
Graph Interface
Section titled “Graph Interface”type Graph interface { // Name returns the graph's identifier. Name() string
// Nodes returns all nodes in the graph. Nodes() []core.Node
// Edges returns all edges in the graph. Edges() []Edge
// NodeByID retrieves a node by its ID. NodeByID(id string) (core.Node, bool)
// Entry returns the entry node ID for execution. Entry() string
// Successors returns the IDs of nodes that follow the given node. Successors(nodeID string) []string
// Predecessors returns the IDs of nodes that precede the given node. Predecessors(nodeID string) []string}BasicGraph
Section titled “BasicGraph”The standard implementation of the Graph interface.
Constructor
Section titled “Constructor”func NewGraph(name string) *BasicGraphMethods
Section titled “Methods”| Method | Description |
|---|---|
Name() string | Returns the graph’s identifier |
Nodes() []Node | Returns all nodes in insertion order |
Edges() []Edge | Returns all edges |
NodeByID(id string) (Node, bool) | Retrieves a node by ID |
Entry() string | Returns the entry node ID |
Successors(nodeID string) []string | Returns successor node IDs |
Predecessors(nodeID string) []string | Returns predecessor node IDs |
AddNode(node Node) error | Adds a node to the graph |
AddEdge(from, to string) error | Adds a directed edge |
SetEntry(nodeID string) error | Sets the entry node |
Validate() error | Checks graph for common issues |
TopologicalSort(allowCycles bool) ([]string, error) | Returns nodes in topological order |
Reachable(startID string) []string | Returns all reachable node IDs |
Example
Section titled “Example”// Create a graph manuallyg := graph.NewGraph("my-workflow")
// Add nodesg.AddNode(inputNode)g.AddNode(processNode)g.AddNode(outputNode)
// Connect nodes with edgesg.AddEdge("input", "process")g.AddEdge("process", "output")
// Set entry pointg.SetEntry("input")
// Validate before useif err := g.Validate(); err != nil { log.Fatal(err)}Represents a directed connection between two nodes:
type Edge struct { From string // source node ID To string // target node ID}GraphBuilder
Section titled “GraphBuilder”The GraphBuilder provides a fluent API for constructing workflow graphs with method chaining.
Constructor
Section titled “Constructor”func NewGraphBuilder(name string) *GraphBuilderMethods
Section titled “Methods”| Method | Description |
|---|---|
AddNode(node) *GraphBuilder | Adds a node and makes it current |
Entry(nodeID) *GraphBuilder | Sets the entry node |
Edge(node) *GraphBuilder | Adds a node connected from current |
EdgeTo(nodeID) *GraphBuilder | Creates edge to existing node |
Connect(fromID, toID) *GraphBuilder | Creates edge between existing nodes |
FanOut(nodes...) *GraphBuilder | Splits to parallel branches |
FanOutTo(nodeIDs...) *GraphBuilder | Splits to existing nodes |
Merge(mergeNode) *GraphBuilder | Combines parallel branches |
MergeTo(nodeID) *GraphBuilder | Merges to existing node |
Branch(nodeID) *GraphBuilder | Switches current to existing node |
Branches(nodeIDs...) *GraphBuilder | Sets multiple current nodes |
WithNodes(nodes...) *GraphBuilder | Adds nodes without edges |
Current() []string | Returns current node IDs |
Errors() []error | Returns accumulated errors |
Build() (Graph, error) | Validates and returns the graph |
MustBuild() Graph | Like Build but panics on error |
Basic Usage
Section titled “Basic Usage”graph, err := graph.NewGraphBuilder("hello-world"). AddNode(core.NewFuncNode("greet", greetFunc)). Edge(core.NewFuncNode("format", formatFunc)). Edge(core.NewFuncNode("output", outputFunc)). Build()Linear Pipeline
Section titled “Linear Pipeline”graph, err := graph.NewGraphBuilder("pipeline"). AddNode(inputNode). // Entry node (first node) Edge(validateNode). // validate follows input Edge(processNode). // process follows validate Edge(outputNode). // output follows process Build()Fan-Out Pattern
Section titled “Fan-Out Pattern”Split execution to parallel branches:
graph, err := graph.NewGraphBuilder("parallel"). AddNode(inputNode). FanOut( branchANode, branchBNode, branchCNode, ). // All three branches are now "current" Merge(mergeNode). Edge(outputNode). Build()Complex Branches
Section titled “Complex Branches”Use FanOutBranches for branches with multiple steps:
graph, err := graph.NewGraphBuilder("complex"). AddNode(inputNode). FanOutBranches( graph.NewPipelineBranch(fetchNode, parseNode, validateNode), graph.NewPipelineBranch(cacheCheckNode, cacheReturnNode), graph.NewBranch(directNode), ). Merge(mergeNode). Build()Conditional Routing
Section titled “Conditional Routing”graph, err := graph.NewGraphBuilder("routing"). AddNode(inputNode). Conditional( routerNode, handlerANode, handlerBNode, handlerCNode, ). Merge(mergeNode). Edge(outputNode). Build()Non-Linear Graphs
Section titled “Non-Linear Graphs”Use Branch() to switch context for building complex topologies:
builder := graph.NewGraphBuilder("complex")
// Build main pathbuilder. AddNode(entryNode). Edge(processNode). Edge(outputNode)
// Go back and add alternative path from entrybuilder. Branch("entry"). // Switch current to entry Edge(alternativeNode). // Add edge from entry EdgeTo("output") // Connect to existing output
graph, err := builder.Build()Adding Nodes Without Edges
Section titled “Adding Nodes Without Edges”builder := graph.NewGraphBuilder("with-extras"). WithNodes(nodeA, nodeB, nodeC). // Add without connecting AddNode(entryNode). // Entry node Edge(nodeA). // Now connect Edge(nodeB). Edge(nodeC)
graph, err := builder.Build()FanOutBranch
Section titled “FanOutBranch”Helper type for building complex fan-out patterns:
type FanOutBranch struct { Entry core.Node // First node of the branch Nodes []core.Node // Additional nodes in sequence}Constructors
Section titled “Constructors”// Single-node branchfunc NewBranch(node core.Node) FanOutBranch
// Multi-node pipeline branchfunc NewPipelineBranch(nodes ...core.Node) FanOutBranchExample
Section titled “Example”branches := []graph.FanOutBranch{ // Simple single-node branch graph.NewBranch(simpleNode),
// Pipeline branch: fetch -> parse -> validate graph.NewPipelineBranch(fetchNode, parseNode, validateNode),
// Another pipeline graph.NewPipelineBranch(cacheNode, transformNode),}
g, err := graph.NewGraphBuilder("complex"). AddNode(inputNode). FanOutBranches(branches...). Merge(mergeNode). Build()Convenience Functions
Section titled “Convenience Functions”BuildGraph
Section titled “BuildGraph”Creates a simple linear graph from nodes:
func BuildGraph(name string, nodes ...Node) (Graph, error)// All nodes are connected in sequenceg, err := petalflow.BuildGraph("linear", inputNode, processNode, outputNode,)MustBuildGraph
Section titled “MustBuildGraph”Like BuildGraph but panics on error. Useful in tests:
func MustBuildGraph(name string, nodes ...Node) Graphg := petalflow.MustBuildGraph("test-graph", inputNode, processNode, outputNode,)Errors
Section titled “Errors”| Error | Cause |
|---|---|
ErrNodeNotFound | Referenced node doesn’t exist in graph |
ErrDuplicateNode | Node with same ID already exists |
ErrInvalidEdge | Edge references non-existent node |
ErrNoEntryNode | No entry node defined |
ErrCycleDetected | Graph contains cycle (when not allowed) |
ErrEmptyGraph | Graph has no nodes |
ErrNodeAlreadyAdded | Node already added to graph |
Graph Patterns
Section titled “Graph Patterns”Router with Multiple Handlers
Section titled “Router with Multiple Handlers”g, err := graph.NewGraphBuilder("support-router"). AddNode(inputNode). Edge(classifyNode). Conditional( router, billingHandler, technicalHandler, generalHandler, ). Merge(formatNode). Edge(outputNode). Build()Parallel Processing with Merge
Section titled “Parallel Processing with Merge”g, err := graph.NewGraphBuilder("parallel-analysis"). AddNode(inputNode). FanOut( sentimentNode, entityNode, summaryNode, ). Merge(combineNode). Edge(outputNode). Build()Loop Pattern (Revise Loop)
Section titled “Loop Pattern (Revise Loop)”builder := graph.NewGraphBuilder("revise-loop")
builder. AddNode(generateNode). Edge(validateNode). Edge(routerNode). Conditional( router, outputNode, // Valid output reviseNode, // Needs revision )
// Create loop back from revise to generatebuilder.Connect("revise", "generate")
g, err := builder.Build()DAG Validation
Section titled “DAG Validation”Check if a graph is a valid DAG:
g := graph.NewGraph("my-graph")// ... add nodes and edges ...
sorted, err := g.TopologicalSort(false) // allowCycles=falseif err != nil { if errors.Is(err, graph.ErrCycleDetected) { log.Println("Graph contains cycles") }}Best Practices
Section titled “Best Practices”| Practice | Recommendation |
|---|---|
| Graph naming | Use descriptive names like support-router, data-pipeline |
| Entry node | First node added becomes entry by default |
| Validation | Always call Build() or Validate() before running |
| Fan-out/merge | Always pair FanOut with Merge to combine branches |
| Cycles | Use sparingly; set MaxHops to prevent infinite loops |
| Testing | Use MustBuild() in tests to catch errors early |