Skip to content

Graph Design

Well-designed graphs are easier to test, debug, and maintain. This guide covers patterns and best practices for building PetalFlow graphs that scale with complexity.

The simplest pattern connects nodes in sequence. Data flows through each node in order.

g := petalflow.NewGraph("linear-pipeline")
g.AddNode(petalflow.NewTransformNode("parse", parseConfig))
g.AddNode(petalflow.NewLLMNode("classify", client, classifyConfig))
g.AddNode(petalflow.NewTransformNode("format", formatConfig))
g.AddEdge("parse", "classify")
g.AddEdge("classify", "format")
g.SetEntry("parse")

Use linear graphs for straightforward pipelines where every input follows the same path.

Router nodes create branches based on conditions. Each branch handles a different case.

g := petalflow.NewGraph("branching-workflow")
router := petalflow.NewRuleRouter("priority_check", petalflow.RuleRouterConfig{
Routes: []petalflow.RouteRule{
{When: petalflow.RouteCondition{Var: "priority", Op: petalflow.OpEquals, Value: "high"}, To: "urgent_handler"},
{When: petalflow.RouteCondition{Var: "priority", Op: petalflow.OpEquals, Value: "low"}, To: "standard_handler"},
},
Default: "standard_handler",
})
g.AddNode(router)
g.AddNode(petalflow.NewLLMNode("urgent_handler", client, urgentConfig))
g.AddNode(petalflow.NewLLMNode("standard_handler", client, standardConfig))
g.AddEdge("priority_check", "urgent_handler")
g.AddEdge("priority_check", "standard_handler")
g.SetEntry("priority_check")

Fan-out sends data to multiple nodes simultaneously. Fan-in merges results back together.

g := petalflow.NewGraph("parallel-analysis")
// Fan-out: one source to multiple processors
g.AddNode(petalflow.NewTransformNode("split", splitConfig))
g.AddNode(petalflow.NewLLMNode("sentiment", client, sentimentConfig))
g.AddNode(petalflow.NewLLMNode("entities", client, entityConfig))
g.AddNode(petalflow.NewLLMNode("summary", client, summaryConfig))
// Fan-in: merge results
g.AddNode(petalflow.NewMergeNode("combine", petalflow.MergeNodeConfig{
InputKeys: []string{"sentiment_result", "entities_result", "summary_result"},
OutputKey: "analysis",
MergeStyle: petalflow.MergeStyleObject,
}))
g.AddEdge("split", "sentiment")
g.AddEdge("split", "entities")
g.AddEdge("split", "summary")
g.AddEdge("sentiment", "combine")
g.AddEdge("entities", "combine")
g.AddEdge("summary", "combine")
g.SetEntry("split")

The runtime executes parallel branches concurrently. MergeNode waits for all inputs before continuing.

Loops enable iterative refinement. A guard or router decides when to exit.

g := petalflow.NewGraph("refinement-loop")
g.AddNode(petalflow.NewLLMNode("draft", client, draftConfig))
g.AddNode(petalflow.NewLLMNode("critique", client, critiqueConfig))
g.AddNode(petalflow.NewRuleRouter("quality_check", petalflow.RuleRouterConfig{
Routes: []petalflow.RouteRule{
{When: petalflow.RouteCondition{Var: "score", Op: petalflow.OpGte, Value: 8}, To: "output"},
{When: petalflow.RouteCondition{Var: "iterations", Op: petalflow.OpGte, Value: 3}, To: "output"},
},
Default: "draft", // Loop back for another iteration
}))
g.AddNode(petalflow.NewTransformNode("output", outputConfig))
g.AddEdge("draft", "critique")
g.AddEdge("critique", "quality_check")
g.AddEdge("quality_check", "draft") // Loop edge
g.AddEdge("quality_check", "output") // Exit edge
g.SetEntry("draft")

Split workflows into logical stages. Each stage has a clear responsibility:

StagePurposeTypical Nodes
IngestionParse, validate, and normalize inputsTransformNode, GuardianNode
EnrichmentAdd context, retrieve data, call APIsToolNode, CacheNode
ReasoningLLM calls, classification, generationLLMNode, LLMRouter
ValidationCheck outputs, enforce constraintsGuardianNode, FilterNode
OutputFormat, transform, and deliver resultsTransformNode
// Stage boundaries make graphs easier to understand
g := petalflow.NewGraph("staged-workflow")
// Ingestion stage
g.AddNode(petalflow.NewTransformNode("parse_input", parseConfig))
g.AddNode(petalflow.NewGuardianNode("validate_input", inputGuardConfig))
// Enrichment stage
g.AddNode(petalflow.NewToolNode("fetch_context", fetchToolConfig))
g.AddNode(petalflow.NewCacheNode("cache_context", cacheConfig))
// Reasoning stage
g.AddNode(petalflow.NewLLMNode("generate", client, generateConfig))
// Validation stage
g.AddNode(petalflow.NewGuardianNode("check_output", outputGuardConfig))
// Output stage
g.AddNode(petalflow.NewTransformNode("format_response", formatConfig))

Define edges explicitly rather than relying on implicit ordering. Explicit edges make routing decisions observable, testable, and easier to debug.

// Explicit edges - clear and observable
g.AddEdge("parse", "classify")
g.AddEdge("classify", "route")
g.AddEdge("route", "handler_a")
g.AddEdge("route", "handler_b")

Benefits of explicit edges:

  • Visibility: Graph structure is clear from code
  • Testability: Edge connections can be verified in tests
  • Debugging: Event logs show exact paths taken
  • Documentation: Graph serves as living documentation

Envelopes carry data between nodes. Establish conventions for key names and document them.

// Input keys - what the workflow receives
"input" // Raw user input
"query" // Search or question text
"document" // Document to process
"context" // Additional context
// Processing keys - intermediate results
"parsed_input" // After parsing/normalization
"enriched_data" // After adding context
"classification" // Category or intent
"draft_response" // Before validation
// Output keys - final results
"response" // Main output
"metadata" // Execution metadata
"citations" // Source references
"confidence" // Confidence scores
// In node configuration, reference envelope keys with templates
config := petalflow.LLMNodeConfig{
PromptTemplate: `Context: {{.Vars.context}}
Question: {{.Vars.query}}
Provide a detailed answer.`,
OutputKey: "response",
}
// In custom nodes, use the envelope API
func (n *CustomNode) Execute(ctx context.Context, env *petalflow.Envelope) error {
query := env.GetVar("query")
context := env.GetVar("context")
result := process(query, context)
env.SetVar("result", result)
return nil
}

Use GuardianNode to validate data at stage boundaries:

inputGuard := petalflow.NewGuardianNode("validate_input", petalflow.GuardianNodeConfig{
Checks: []petalflow.GuardCheck{
{Var: "query", Op: petalflow.OpNotEmpty, Message: "Query cannot be empty"},
{Var: "query", Op: petalflow.OpMaxLength, Value: 10000, Message: "Query too long"},
},
OnFail: petalflow.GuardActionReject,
})

Route errors to dedicated handlers instead of failing silently:

router := petalflow.NewRuleRouter("error_check", petalflow.RuleRouterConfig{
Routes: []petalflow.RouteRule{
{When: petalflow.RouteCondition{Var: "error", Op: petalflow.OpNotEmpty}, To: "error_handler"},
},
Default: "continue_processing",
})

Provide fallback paths when optional steps fail:

g.AddNode(petalflow.NewToolNode("primary_source", primaryConfig))
g.AddNode(petalflow.NewToolNode("fallback_source", fallbackConfig))
g.AddNode(petalflow.NewRuleRouter("source_check", petalflow.RuleRouterConfig{
Routes: []petalflow.RouteRule{
{When: petalflow.RouteCondition{Var: "primary_result", Op: petalflow.OpNotEmpty}, To: "process"},
},
Default: "fallback_source",
}))

Break large workflows into reusable subgraphs:

// Define a reusable subgraph
func buildValidationSubgraph() *petalflow.BasicGraph {
sub := petalflow.NewGraph("validation")
sub.AddNode(petalflow.NewGuardianNode("format_check", formatGuardConfig))
sub.AddNode(petalflow.NewGuardianNode("content_check", contentGuardConfig))
sub.AddEdge("format_check", "content_check")
sub.SetEntry("format_check")
return sub
}
// Embed in parent graph
main := petalflow.NewGraph("main-workflow")
main.AddSubgraph("validate", buildValidationSubgraph())
main.AddEdge("generate", "validate")
main.AddEdge("validate", "output")
func TestClassifyNode(t *testing.T) {
node := petalflow.NewLLMNode("classify", mockClient, classifyConfig)
env := petalflow.NewEnvelope()
env.SetVar("input", "I need help with billing")
err := node.Execute(context.Background(), env)
require.NoError(t, err)
classification := env.GetVar("classification")
assert.Equal(t, "billing", classification)
}
func TestSupportWorkflow(t *testing.T) {
graph := buildSupportGraph(mockClient)
runtime := petalflow.NewRuntime()
env := petalflow.NewEnvelope()
env.SetVar("ticket", testTicket)
result, err := runtime.Run(context.Background(), graph, env, petalflow.RunOptions{})
require.NoError(t, err)
assert.NotEmpty(t, result.GetVar("response"))
assert.Equal(t, "resolved", result.GetVar("status"))
}
func TestRoutingPaths(t *testing.T) {
var visitedNodes []string
handler := func(event petalflow.Event) {
if event.Kind == petalflow.EventNodeStart {
visitedNodes = append(visitedNodes, event.NodeID)
}
}
runtime := petalflow.NewRuntime()
_, err := runtime.Run(ctx, graph, env, petalflow.RunOptions{EventHandler: handler})
require.NoError(t, err)
// Verify expected path was taken
assert.Equal(t, []string{"entry", "router", "handler_b", "output"}, visitedNodes)
}