Skip to content

Petal Flow Graph API

The graph package provides the directed graph model for PetalFlow workflows.

import "github.com/petal-labs/petalflow/graph"

Or use the top-level re-exports:

import "github.com/petal-labs/petalflow"

TypePurpose
GraphGraph interface for workflow structure
BasicGraphStandard graph implementation
EdgeDirected connection between nodes
GraphBuilderFluent API for constructing graphs
FanOutBranchHelper for parallel branch patterns

type Graph interface {
// Name returns the graph's identifier.
Name() string
// Nodes returns all nodes in the graph.
Nodes() []core.Node
// Edges returns all edges in the graph.
Edges() []Edge
// NodeByID retrieves a node by its ID.
NodeByID(id string) (core.Node, bool)
// Entry returns the entry node ID for execution.
Entry() string
// Successors returns the IDs of nodes that follow the given node.
Successors(nodeID string) []string
// Predecessors returns the IDs of nodes that precede the given node.
Predecessors(nodeID string) []string
}

The standard implementation of the Graph interface.

func NewGraph(name string) *BasicGraph
MethodDescription
Name() stringReturns the graph’s identifier
Nodes() []NodeReturns all nodes in insertion order
Edges() []EdgeReturns all edges
NodeByID(id string) (Node, bool)Retrieves a node by ID
Entry() stringReturns the entry node ID
Successors(nodeID string) []stringReturns successor node IDs
Predecessors(nodeID string) []stringReturns predecessor node IDs
AddNode(node Node) errorAdds a node to the graph
AddEdge(from, to string) errorAdds a directed edge
SetEntry(nodeID string) errorSets the entry node
Validate() errorChecks graph for common issues
TopologicalSort(allowCycles bool) ([]string, error)Returns nodes in topological order
Reachable(startID string) []stringReturns all reachable node IDs
// Create a graph manually
g := graph.NewGraph("my-workflow")
// Add nodes
g.AddNode(inputNode)
g.AddNode(processNode)
g.AddNode(outputNode)
// Connect nodes with edges
g.AddEdge("input", "process")
g.AddEdge("process", "output")
// Set entry point
g.SetEntry("input")
// Validate before use
if err := g.Validate(); err != nil {
log.Fatal(err)
}

Represents a directed connection between two nodes:

type Edge struct {
From string // source node ID
To string // target node ID
}

The GraphBuilder provides a fluent API for constructing workflow graphs with method chaining.

func NewGraphBuilder(name string) *GraphBuilder
MethodDescription
AddNode(node) *GraphBuilderAdds a node and makes it current
Entry(nodeID) *GraphBuilderSets the entry node
Edge(node) *GraphBuilderAdds a node connected from current
EdgeTo(nodeID) *GraphBuilderCreates edge to existing node
Connect(fromID, toID) *GraphBuilderCreates edge between existing nodes
FanOut(nodes...) *GraphBuilderSplits to parallel branches
FanOutTo(nodeIDs...) *GraphBuilderSplits to existing nodes
Merge(mergeNode) *GraphBuilderCombines parallel branches
MergeTo(nodeID) *GraphBuilderMerges to existing node
Branch(nodeID) *GraphBuilderSwitches current to existing node
Branches(nodeIDs...) *GraphBuilderSets multiple current nodes
WithNodes(nodes...) *GraphBuilderAdds nodes without edges
Current() []stringReturns current node IDs
Errors() []errorReturns accumulated errors
Build() (Graph, error)Validates and returns the graph
MustBuild() GraphLike Build but panics on error
graph, err := graph.NewGraphBuilder("hello-world").
AddNode(core.NewFuncNode("greet", greetFunc)).
Edge(core.NewFuncNode("format", formatFunc)).
Edge(core.NewFuncNode("output", outputFunc)).
Build()
graph, err := graph.NewGraphBuilder("pipeline").
AddNode(inputNode). // Entry node (first node)
Edge(validateNode). // validate follows input
Edge(processNode). // process follows validate
Edge(outputNode). // output follows process
Build()

Split execution to parallel branches:

graph, err := graph.NewGraphBuilder("parallel").
AddNode(inputNode).
FanOut(
branchANode,
branchBNode,
branchCNode,
).
// All three branches are now "current"
Merge(mergeNode).
Edge(outputNode).
Build()

Use FanOutBranches for branches with multiple steps:

graph, err := graph.NewGraphBuilder("complex").
AddNode(inputNode).
FanOutBranches(
graph.NewPipelineBranch(fetchNode, parseNode, validateNode),
graph.NewPipelineBranch(cacheCheckNode, cacheReturnNode),
graph.NewBranch(directNode),
).
Merge(mergeNode).
Build()
graph, err := graph.NewGraphBuilder("routing").
AddNode(inputNode).
Conditional(
routerNode,
handlerANode,
handlerBNode,
handlerCNode,
).
Merge(mergeNode).
Edge(outputNode).
Build()

Use Branch() to switch context for building complex topologies:

builder := graph.NewGraphBuilder("complex")
// Build main path
builder.
AddNode(entryNode).
Edge(processNode).
Edge(outputNode)
// Go back and add alternative path from entry
builder.
Branch("entry"). // Switch current to entry
Edge(alternativeNode). // Add edge from entry
EdgeTo("output") // Connect to existing output
graph, err := builder.Build()
builder := graph.NewGraphBuilder("with-extras").
WithNodes(nodeA, nodeB, nodeC). // Add without connecting
AddNode(entryNode). // Entry node
Edge(nodeA). // Now connect
Edge(nodeB).
Edge(nodeC)
graph, err := builder.Build()

Helper type for building complex fan-out patterns:

type FanOutBranch struct {
Entry core.Node // First node of the branch
Nodes []core.Node // Additional nodes in sequence
}
// Single-node branch
func NewBranch(node core.Node) FanOutBranch
// Multi-node pipeline branch
func NewPipelineBranch(nodes ...core.Node) FanOutBranch
branches := []graph.FanOutBranch{
// Simple single-node branch
graph.NewBranch(simpleNode),
// Pipeline branch: fetch -> parse -> validate
graph.NewPipelineBranch(fetchNode, parseNode, validateNode),
// Another pipeline
graph.NewPipelineBranch(cacheNode, transformNode),
}
g, err := graph.NewGraphBuilder("complex").
AddNode(inputNode).
FanOutBranches(branches...).
Merge(mergeNode).
Build()

Creates a simple linear graph from nodes:

func BuildGraph(name string, nodes ...Node) (Graph, error)
// All nodes are connected in sequence
g, err := petalflow.BuildGraph("linear",
inputNode,
processNode,
outputNode,
)

Like BuildGraph but panics on error. Useful in tests:

func MustBuildGraph(name string, nodes ...Node) Graph
g := petalflow.MustBuildGraph("test-graph",
inputNode,
processNode,
outputNode,
)

ErrorCause
ErrNodeNotFoundReferenced node doesn’t exist in graph
ErrDuplicateNodeNode with same ID already exists
ErrInvalidEdgeEdge references non-existent node
ErrNoEntryNodeNo entry node defined
ErrCycleDetectedGraph contains cycle (when not allowed)
ErrEmptyGraphGraph has no nodes
ErrNodeAlreadyAddedNode already added to graph

g, err := graph.NewGraphBuilder("support-router").
AddNode(inputNode).
Edge(classifyNode).
Conditional(
router,
billingHandler,
technicalHandler,
generalHandler,
).
Merge(formatNode).
Edge(outputNode).
Build()
g, err := graph.NewGraphBuilder("parallel-analysis").
AddNode(inputNode).
FanOut(
sentimentNode,
entityNode,
summaryNode,
).
Merge(combineNode).
Edge(outputNode).
Build()
builder := graph.NewGraphBuilder("revise-loop")
builder.
AddNode(generateNode).
Edge(validateNode).
Edge(routerNode).
Conditional(
router,
outputNode, // Valid output
reviseNode, // Needs revision
)
// Create loop back from revise to generate
builder.Connect("revise", "generate")
g, err := builder.Build()

Check if a graph is a valid DAG:

g := graph.NewGraph("my-graph")
// ... add nodes and edges ...
sorted, err := g.TopologicalSort(false) // allowCycles=false
if err != nil {
if errors.Is(err, graph.ErrCycleDetected) {
log.Println("Graph contains cycles")
}
}

PracticeRecommendation
Graph namingUse descriptive names like support-router, data-pipeline
Entry nodeFirst node added becomes entry by default
ValidationAlways call Build() or Validate() before running
Fan-out/mergeAlways pair FanOut with Merge to combine branches
CyclesUse sparingly; set MaxHops to prevent infinite loops
TestingUse MustBuild() in tests to catch errors early